Add spring cloud stream rocketmq thread pool docking (#215)

pull/233/head
chen.ma 2 years ago
parent 7d014306c4
commit 29baf830e4

@ -17,6 +17,8 @@
package cn.hippo4j.adapter.base;
import java.util.List;
/**
* Adapt to the thread pool of the third-party framework.
*/
@ -32,10 +34,19 @@ public interface ThreadPoolAdapter {
/**
* Get the core parameters of the framework thread pool.
*
* @param identify {@link ThreadPoolAdapter#mark} + Thread pool unique id
* @param identify Thread pool unique id
* @return
*/
ThreadPoolAdapterState getThreadPoolState(String identify);
/**
* Get the core parameters of the framework thread pool.
*
* @return
*/
ThreadPoolAdapterState getThreadPoolStateInfo(String identify);
default List<ThreadPoolAdapterState> getThreadPoolStates() {
return null;
}
/**
* Modify the core parameters of the framework thread pool.

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.adapter.base;
import lombok.Data;
import java.util.List;
/**
* Thread-pool adapter cache config.
*/
@Data
public class ThreadPoolAdapterCacheConfig {
/**
* Mark
*/
private String mark;
/**
* Tenant item key
*/
private String tenantItemKey;
/**
* Client identify
*/
private String clientIdentify;
/**
* Active
*/
private String active;
/**
* Client address
*/
private String clientAddress;
/**
* Thread-pool adapter states
*/
private List<ThreadPoolAdapterState> threadPoolAdapterStates;
}

@ -26,22 +26,22 @@ import lombok.Data;
public class ThreadPoolAdapterParameter {
/**
* mark
* Mark
*/
private String mark;
/**
* identify
* Thread-pool key
*/
private String identify;
private String threadPoolKey;
/**
* Core size.
* Core size
*/
private Integer coreSize;
/**
* Maximum size.
* Maximum size
*/
private Integer maximumSize;
}

@ -26,12 +26,42 @@ import lombok.Data;
public class ThreadPoolAdapterState {
/**
* Core size.
* Thread-pool keu
*/
private String threadPoolKey;
/**
* Active
*/
private String active;
/**
* identify
*/
private String identify;
/**
* Client address
*/
private String clientAddress;
/**
* Core size
*/
private Integer coreSize;
/**
* Maximum size.
* Maximum size
*/
private Integer maximumSize;
/**
* Blocking queue type
*/
private String blockingQueueType;
/**
* Blocking queue capacity
*/
private Integer blockingQueueCapacity;
}

@ -25,6 +25,7 @@ import cn.hippo4j.common.toolkit.CollectionUtil;
import cn.hippo4j.common.toolkit.ReflectUtil;
import com.alibaba.cloud.stream.binder.rocketmq.consuming.RocketMQListenerBindingContainer;
import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
@ -37,12 +38,12 @@ import org.springframework.cloud.stream.binding.InputBindingLifecycle;
import org.springframework.context.ApplicationListener;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ThreadPoolExecutor;
import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMITER;
import static cn.hippo4j.common.constant.Constants.IDENTIFY_SLICER_SYMBOL;
/**
* Spring cloud stream rocketMQ thread-pool adapter.
@ -58,10 +59,12 @@ public class SpringCloudStreamRocketMQThreadPoolAdapter implements ThreadPoolAda
}
@Override
public ThreadPoolAdapterState getThreadPoolStateInfo(String identify) {
public ThreadPoolAdapterState getThreadPoolState(String identify) {
ThreadPoolAdapterState result = new ThreadPoolAdapterState();
ThreadPoolExecutor rocketMQConsumeExecutor = ROCKET_MQ_SPRING_CLOUD_STREAM_CONSUME_EXECUTOR.get(identify);
if (rocketMQConsumeExecutor != null) {
result.setThreadPoolKey(mark());
result.setThreadPoolKey(identify);
result.setCoreSize(rocketMQConsumeExecutor.getCorePoolSize());
result.setMaximumSize(rocketMQConsumeExecutor.getMaximumPoolSize());
return result;
@ -70,22 +73,30 @@ public class SpringCloudStreamRocketMQThreadPoolAdapter implements ThreadPoolAda
return result;
}
@Override
public List<ThreadPoolAdapterState> getThreadPoolStates() {
List<ThreadPoolAdapterState> adapterStateList = Lists.newArrayList();
ROCKET_MQ_SPRING_CLOUD_STREAM_CONSUME_EXECUTOR.forEach(
(key, val) -> adapterStateList.add(getThreadPoolState(key)));
return adapterStateList;
}
@Override
public boolean updateThreadPool(ThreadPoolAdapterParameter threadPoolAdapterParameter) {
String identify = threadPoolAdapterParameter.getIdentify();
ThreadPoolExecutor rocketMQConsumeExecutor = ROCKET_MQ_SPRING_CLOUD_STREAM_CONSUME_EXECUTOR.get(identify);
String threadPoolKey = threadPoolAdapterParameter.getThreadPoolKey();
ThreadPoolExecutor rocketMQConsumeExecutor = ROCKET_MQ_SPRING_CLOUD_STREAM_CONSUME_EXECUTOR.get(threadPoolKey);
if (rocketMQConsumeExecutor != null) {
int originalCoreSize = rocketMQConsumeExecutor.getCorePoolSize();
int originalMaximumPoolSize = rocketMQConsumeExecutor.getMaximumPoolSize();
rocketMQConsumeExecutor.setCorePoolSize(threadPoolAdapterParameter.getCoreSize());
rocketMQConsumeExecutor.setMaximumPoolSize(threadPoolAdapterParameter.getMaximumSize());
log.info("[{}] RocketMQ consumption thread pool parameter change. coreSize :: {}, maximumSize :: {}",
identify,
threadPoolKey,
String.format(CHANGE_DELIMITER, originalCoreSize, rocketMQConsumeExecutor.getCorePoolSize()),
String.format(CHANGE_DELIMITER, originalMaximumPoolSize, rocketMQConsumeExecutor.getMaximumPoolSize()));
return true;
}
log.warn("[{}] RocketMQ consuming thread pool not found.", identify);
log.warn("[{}] RocketMQ consuming thread pool not found.", threadPoolKey);
return false;
}
@ -99,7 +110,6 @@ public class SpringCloudStreamRocketMQThreadPoolAdapter implements ThreadPoolAda
try {
for (Binding<Object> each : inputBindings) {
String bindingName = each.getBindingName();
String buildKey = mark() + IDENTIFY_SLICER_SYMBOL + bindingName;
DefaultBinding defaultBinding = (DefaultBinding) each;
RocketMQInboundChannelAdapter lifecycle = (RocketMQInboundChannelAdapter) ReflectUtil.getFieldValue(defaultBinding, "lifecycle");
RocketMQListenerBindingContainer rocketMQListenerContainer = (RocketMQListenerBindingContainer) ReflectUtil.getFieldValue(lifecycle, "rocketMQListenerContainer");
@ -107,7 +117,7 @@ public class SpringCloudStreamRocketMQThreadPoolAdapter implements ThreadPoolAda
DefaultMQPushConsumerImpl defaultMQPushConsumerImpl = consumer.getDefaultMQPushConsumerImpl();
ConsumeMessageConcurrentlyService consumeMessageService = (ConsumeMessageConcurrentlyService) defaultMQPushConsumerImpl.getConsumeMessageService();
ThreadPoolExecutor consumeExecutor = (ThreadPoolExecutor) ReflectUtil.getFieldValue(consumeMessageService, "consumeExecutor");
ROCKET_MQ_SPRING_CLOUD_STREAM_CONSUME_EXECUTOR.put(buildKey, consumeExecutor);
ROCKET_MQ_SPRING_CLOUD_STREAM_CONSUME_EXECUTOR.put(bindingName, consumeExecutor);
}
} catch (Exception ex) {
log.error("Failed to get input-bindings thread pool.", ex);

@ -67,6 +67,10 @@ public class Constants {
public static final String MONITOR_PATH = BASE_PATH + "/monitor";
public static final String REGISTER_ADAPTER_BASE_PATH = BASE_PATH + "/adapter/thread-pool";
public static final String REGISTER_ADAPTER_PATH = REGISTER_ADAPTER_BASE_PATH + "/register";
public static final String HEALTH_CHECK_PATH = BASE_PATH + "/health/check";
public static final String PROBE_MODIFY_REQUEST = "Listening-Configs";

@ -64,5 +64,10 @@
<groupId>cn.hippo4j</groupId>
<artifactId>log-record-tool</artifactId>
</dependency>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-adapter-base</artifactId>
</dependency>
</dependencies>
</project>

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.controller;
import cn.hippo4j.adapter.base.ThreadPoolAdapterCacheConfig;
import cn.hippo4j.common.web.base.Result;
import cn.hippo4j.common.web.base.Results;
import cn.hippo4j.config.service.ThreadPoolAdapterService;
import lombok.AllArgsConstructor;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
import static cn.hippo4j.common.constant.Constants.REGISTER_ADAPTER_PATH;
/**
* Thread-pool adapter controller.
*/
@RestController
@AllArgsConstructor
public class ThreadPoolAdapterController {
private final ThreadPoolAdapterService threadPoolAdapterService;
@PostMapping(REGISTER_ADAPTER_PATH)
public Result registerAdapterThreadPool(@RequestBody List<ThreadPoolAdapterCacheConfig> requestParameter) {
threadPoolAdapterService.register(requestParameter);
return Results.success();
}
}

@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.model.biz.adapter;
import lombok.Data;
import java.util.List;
/**
* Thread-pool adapter req DTO.
*/
@Data
public class ThreadPoolAdapterReqDTO {
/**
* Mark
*/
private String mark;
/**
* Tenant
*/
private String tenant;
/**
* Item
*/
private String item;
/**
* Thread pool key
*/
private String threadPoolKey;
/**
* Identify
*/
private String identify;
/**
* Core size
*/
private Integer coreSize;
/**
* Maximum size
*/
private Integer maximumSize;
/**
* Client address list
*/
private List<String> clientAddressList;
}

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.model.biz.adapter;
import lombok.Data;
/**
* Thread-pool adapter resp DTO.
*/
@Data
public class ThreadPoolAdapterRespDTO {
/**
* Identify
*/
private String identify;
/**
* Active
*/
private String active;
/**
* Client address
*/
private String clientAddress;
/**
* Thread pool key
*/
private String threadPoolKey;
/**
* Core size
*/
private Integer coreSize;
/**
* Maximum size
*/
private Integer maximumSize;
}

@ -114,7 +114,7 @@ public class LongPollingService {
@Override
public void run() {
try {
for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {
for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext();) {
ClientLongPolling clientSub = iter.next();
String identity = groupKey + GROUP_KEY_DELIMITER + identify;
List<String> parseMapForFilter = Lists.newArrayList(identity);

@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.service;
import cn.hippo4j.adapter.base.ThreadPoolAdapterCacheConfig;
import cn.hippo4j.adapter.base.ThreadPoolAdapterState;
import cn.hippo4j.common.toolkit.CollectionUtil;
import cn.hippo4j.common.toolkit.JSONUtil;
import cn.hippo4j.common.toolkit.StringUtil;
import cn.hippo4j.common.web.base.Result;
import cn.hippo4j.config.model.biz.adapter.ThreadPoolAdapterReqDTO;
import cn.hippo4j.config.model.biz.adapter.ThreadPoolAdapterRespDTO;
import cn.hutool.core.text.StrBuilder;
import cn.hutool.http.HttpUtil;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import static cn.hippo4j.common.constant.Constants.HTTP_EXECUTE_TIMEOUT;
import static cn.hippo4j.common.constant.Constants.IDENTIFY_SLICER_SYMBOL;
/**
* Thread-pool adapter service.
*/
@Slf4j
@Service
public class ThreadPoolAdapterService {
/**
* Map<mark, Map<tenantItem, Map<threadPoolKey, List<String>>>>
*/
private final Map<String, Map<String, Map<String, List<String>>>> THREAD_POOL_ADAPTER_MAP = Maps.newConcurrentMap();
public synchronized void register(List<ThreadPoolAdapterCacheConfig> requestParameter) {
for (ThreadPoolAdapterCacheConfig each : requestParameter) {
String mark = each.getMark();
Map<String, Map<String, List<String>>> actual = THREAD_POOL_ADAPTER_MAP.get(mark);
if (CollectionUtil.isEmpty(actual)) {
actual = Maps.newHashMap();
THREAD_POOL_ADAPTER_MAP.put(mark, actual);
}
Map<String, List<String>> tenantItemMap = actual.get(each.getTenantItemKey());
if (CollectionUtil.isEmpty(tenantItemMap)) {
tenantItemMap = Maps.newHashMap();
actual.put(each.getTenantItemKey(), tenantItemMap);
}
List<ThreadPoolAdapterState> threadPoolAdapterStates = each.getThreadPoolAdapterStates();
for (ThreadPoolAdapterState adapterState : threadPoolAdapterStates) {
List<String> threadPoolKeyList = tenantItemMap.get(adapterState.getThreadPoolKey());
if (CollectionUtil.isEmpty(threadPoolKeyList)) {
threadPoolKeyList = Lists.newArrayList();
tenantItemMap.put(adapterState.getThreadPoolKey(), threadPoolKeyList);
}
threadPoolKeyList.add(each.getClientAddress());
}
}
}
public List<ThreadPoolAdapterRespDTO> query(ThreadPoolAdapterReqDTO requestParameter) {
List<String> actual = Optional.ofNullable(THREAD_POOL_ADAPTER_MAP.get(requestParameter.getMark()))
.map(each -> each.get(requestParameter.getTenant() + IDENTIFY_SLICER_SYMBOL + requestParameter.getItem()))
.map(each -> each.get(requestParameter.getThreadPoolKey()))
.orElse(Lists.newArrayList());
List<ThreadPoolAdapterRespDTO> result = Lists.newCopyOnWriteArrayList();
actual.parallelStream().forEach(each -> {
String urlString = StrBuilder.create("http://", each, "/adapter/thread-pool/info").toString();
Map<String, Object> param = Maps.newHashMap();
param.put("mark", requestParameter.getMark());
param.put("threadPoolKey", requestParameter.getThreadPoolKey());
try {
String resultStr = HttpUtil.get(urlString, param, HTTP_EXECUTE_TIMEOUT);
if (StringUtil.isNotBlank(resultStr)) {
Result<ThreadPoolAdapterRespDTO> restResult = JSONUtil.parseObject(resultStr, Result.class);
result.add(restResult.getData());
}
} catch (Throwable ex) {
log.error("Failed to get third-party thread pool data.", ex);
}
});
return result;
}
}

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.console.controller;
import cn.hippo4j.common.toolkit.JSONUtil;
import cn.hippo4j.common.web.base.Result;
import cn.hippo4j.common.web.base.Results;
import cn.hippo4j.config.model.biz.adapter.ThreadPoolAdapterReqDTO;
import cn.hippo4j.config.model.biz.adapter.ThreadPoolAdapterRespDTO;
import cn.hippo4j.config.service.ThreadPoolAdapterService;
import cn.hutool.core.text.StrBuilder;
import cn.hutool.http.HttpUtil;
import lombok.AllArgsConstructor;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
import static cn.hippo4j.common.constant.Constants.HTTP_EXECUTE_TIMEOUT;
import static cn.hippo4j.common.constant.Constants.REGISTER_ADAPTER_BASE_PATH;
/**
* Thread-pool adapter controller.
*/
@AllArgsConstructor
@RestController("threadPoolAdapterConsoleController")
public class ThreadPoolAdapterController {
private final ThreadPoolAdapterService threadPoolAdapterService;
@GetMapping(REGISTER_ADAPTER_BASE_PATH + "/query")
public Result<List<ThreadPoolAdapterRespDTO>> queryAdapterThreadPool(ThreadPoolAdapterReqDTO requestParameter) {
List<ThreadPoolAdapterRespDTO> result = threadPoolAdapterService.query(requestParameter);
return Results.success(result);
}
@PostMapping(REGISTER_ADAPTER_BASE_PATH + "/update")
public Result<Void> updateAdapterThreadPool(@RequestBody ThreadPoolAdapterReqDTO requestParameter) {
for (String each : requestParameter.getClientAddressList()) {
String urlString = StrBuilder.create("http://", each, "/adapter/thread-pool/update").toString();
HttpUtil.post(urlString, JSONUtil.toJSONString(requestParameter), HTTP_EXECUTE_TIMEOUT);
}
return Results.success();
}
}

@ -33,6 +33,7 @@ import cn.hippo4j.springboot.starter.controller.WebThreadPoolController;
import cn.hippo4j.springboot.starter.controller.WebThreadPoolRunStateController;
import cn.hippo4j.springboot.starter.core.*;
import cn.hippo4j.springboot.starter.event.ApplicationContentPostProcessor;
import cn.hippo4j.springboot.starter.core.ThreadPoolAdapterRegister;
import cn.hippo4j.springboot.starter.monitor.ReportingEventExecutor;
import cn.hippo4j.springboot.starter.monitor.collect.RunTimeInfoCollector;
import cn.hippo4j.springboot.starter.monitor.send.HttpConnectSender;
@ -139,8 +140,9 @@ public class DynamicThreadPoolAutoConfiguration {
}
@Bean
public ThreadPoolAdapterController threadPoolAdapterController() {
return new ThreadPoolAdapterController();
@SuppressWarnings("all")
public ThreadPoolAdapterController threadPoolAdapterController(InetUtils hippo4JInetUtils) {
return new ThreadPoolAdapterController(environment, hippo4JInetUtils);
}
@Bean
@ -154,7 +156,14 @@ public class DynamicThreadPoolAutoConfiguration {
}
@Bean
@SuppressWarnings("all")
public WebThreadPoolController webThreadPoolController(WebThreadPoolHandlerChoose webThreadPoolServiceChoose) {
return new WebThreadPoolController(webThreadPoolServiceChoose);
}
@Bean
@SuppressWarnings("all")
public ThreadPoolAdapterRegister threadPoolAdapterRegister(HttpAgent httpAgent, InetUtils hippo4JInetUtils) {
return new ThreadPoolAdapterRegister(httpAgent, properties, environment, hippo4JInetUtils);
}
}

@ -19,9 +19,16 @@ package cn.hippo4j.springboot.starter.controller;
import cn.hippo4j.adapter.base.ThreadPoolAdapter;
import cn.hippo4j.adapter.base.ThreadPoolAdapterParameter;
import cn.hippo4j.adapter.base.ThreadPoolAdapterState;
import cn.hippo4j.common.web.base.Result;
import cn.hippo4j.common.web.base.Results;
import cn.hippo4j.core.toolkit.IdentifyUtil;
import cn.hippo4j.core.toolkit.inet.InetUtils;
import cn.hippo4j.springboot.starter.toolkit.CloudCommonIdUtil;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
@ -33,12 +40,34 @@ import static cn.hippo4j.adapter.base.ThreadPoolAdapterBeanContainer.THREAD_POOL
/**
* Thread-pool adapter controller.
*/
@Slf4j
@RestController
@AllArgsConstructor
public class ThreadPoolAdapterController {
@PostMapping("/update/adapter/thread-pool")
private final ConfigurableEnvironment environment;
private final InetUtils hippo4JInetUtils;
@GetMapping("/adapter/thread-pool/info")
public Result<ThreadPoolAdapterState> getAdapterThreadPool(ThreadPoolAdapterParameter requestParameter) {
ThreadPoolAdapter threadPoolAdapter = THREAD_POOL_ADAPTER_BEAN_CONTAINER.get(requestParameter.getMark());
ThreadPoolAdapterState result = Optional.ofNullable(threadPoolAdapter).map(each -> {
ThreadPoolAdapterState threadPoolState = each.getThreadPoolState(requestParameter.getThreadPoolKey());
String active = environment.getProperty("spring.profiles.active", "UNKNOWN");
threadPoolState.setActive(active);
String clientAddress = CloudCommonIdUtil.getDefaultInstanceId(environment, hippo4JInetUtils);
threadPoolState.setClientAddress(clientAddress);
threadPoolState.setIdentify(IdentifyUtil.getIdentify());
return threadPoolState;
}).orElse(null);
return Results.success(result);
}
@PostMapping("/adapter/thread-pool/update")
public Result<Void> updateAdapterThreadPool(@RequestBody ThreadPoolAdapterParameter requestParameter) {
log.info("[{}] Change third-party thread pool data. key: {}, coreSize: {}, maximumSize: {}",
requestParameter.getMark(), requestParameter.getThreadPoolKey(), requestParameter.getCoreSize(), requestParameter.getMaximumSize());
ThreadPoolAdapter threadPoolAdapter = THREAD_POOL_ADAPTER_BEAN_CONTAINER.get(requestParameter.getMark());
Optional.ofNullable(threadPoolAdapter).ifPresent(each -> each.updateThreadPool(requestParameter));
return Results.success();

@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.springboot.starter.core;
import cn.hippo4j.adapter.base.ThreadPoolAdapter;
import cn.hippo4j.adapter.base.ThreadPoolAdapterCacheConfig;
import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.toolkit.CollectionUtil;
import cn.hippo4j.common.web.base.Result;
import cn.hippo4j.core.toolkit.inet.InetUtils;
import cn.hippo4j.springboot.starter.config.BootstrapProperties;
import cn.hippo4j.springboot.starter.remote.HttpAgent;
import cn.hippo4j.springboot.starter.toolkit.CloudCommonIdUtil;
import com.google.common.collect.Lists;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.env.ConfigurableEnvironment;
import java.util.List;
import java.util.Map;
import static cn.hippo4j.common.constant.Constants.IDENTIFY_SLICER_SYMBOL;
import static cn.hippo4j.common.constant.Constants.REGISTER_ADAPTER_PATH;
/**
* Thread-pool adapter register.
*/
@Slf4j
@AllArgsConstructor
public class ThreadPoolAdapterRegister implements ApplicationRunner {
private final HttpAgent httpAgent;
private final BootstrapProperties properties;
private final ConfigurableEnvironment environment;
private final InetUtils hippo4JInetUtils;
@Override
public void run(ApplicationArguments args) throws Exception {
Map<String, ThreadPoolAdapter> threadPoolAdapterMap = ApplicationContextHolder.getBeansOfType(ThreadPoolAdapter.class);
List<ThreadPoolAdapterCacheConfig> cacheConfigList = Lists.newArrayList();
threadPoolAdapterMap.forEach((key, val) -> {
ThreadPoolAdapterCacheConfig cacheConfig = new ThreadPoolAdapterCacheConfig();
cacheConfig.setMark(val.mark());
String tenantItemKey = properties.getNamespace() + IDENTIFY_SLICER_SYMBOL + properties.getItemId();
cacheConfig.setTenantItemKey(tenantItemKey);
String clientAddress = CloudCommonIdUtil.getDefaultInstanceId(environment, hippo4JInetUtils);
cacheConfig.setClientAddress(clientAddress);
cacheConfig.setThreadPoolAdapterStates(val.getThreadPoolStates());
cacheConfigList.add(cacheConfig);
});
if (CollectionUtil.isNotEmpty(cacheConfigList)) {
try {
Result result = httpAgent.httpPost(REGISTER_ADAPTER_PATH, cacheConfigList);
if (!result.isSuccess()) {
log.warn("Failed to register third-party thread pool data.");
}
} catch (Throwable ex) {
log.error("Failed to register third-party thread pool data.", ex);
}
}
}
}
Loading…
Cancel
Save