diff --git a/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapter.java b/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapter.java index ce7b3b4f..20b39602 100644 --- a/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapter.java +++ b/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapter.java @@ -17,6 +17,8 @@ package cn.hippo4j.adapter.base; +import java.util.List; + /** * Adapt to the thread pool of the third-party framework. */ @@ -32,10 +34,19 @@ public interface ThreadPoolAdapter { /** * Get the core parameters of the framework thread pool. * - * @param identify {@link ThreadPoolAdapter#mark} + Thread pool unique id + * @param identify Thread pool unique id + * @return + */ + ThreadPoolAdapterState getThreadPoolState(String identify); + + /** + * Get the core parameters of the framework thread pool. + * * @return */ - ThreadPoolAdapterState getThreadPoolStateInfo(String identify); + default List getThreadPoolStates() { + return null; + } /** * Modify the core parameters of the framework thread pool. diff --git a/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterCacheConfig.java b/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterCacheConfig.java new file mode 100644 index 00000000..07b2c73c --- /dev/null +++ b/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterCacheConfig.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.adapter.base; + +import lombok.Data; + +import java.util.List; + +/** + * Thread-pool adapter cache config. + */ +@Data +public class ThreadPoolAdapterCacheConfig { + + /** + * Mark + */ + private String mark; + + /** + * Tenant item key + */ + private String tenantItemKey; + + /** + * Client identify + */ + private String clientIdentify; + + /** + * Active + */ + private String active; + + /** + * Client address + */ + private String clientAddress; + + /** + * Thread-pool adapter states + */ + private List threadPoolAdapterStates; +} diff --git a/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterParameter.java b/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterParameter.java index 056a1ba8..c4e8b31c 100644 --- a/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterParameter.java +++ b/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterParameter.java @@ -26,22 +26,22 @@ import lombok.Data; public class ThreadPoolAdapterParameter { /** - * mark + * Mark */ private String mark; /** - * identify + * Thread-pool key */ - private String identify; + private String threadPoolKey; /** - * Core size. + * Core size */ private Integer coreSize; /** - * Maximum size. + * Maximum size */ private Integer maximumSize; } diff --git a/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterState.java b/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterState.java index cd42f0fa..b46c6c26 100644 --- a/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterState.java +++ b/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterState.java @@ -26,12 +26,42 @@ import lombok.Data; public class ThreadPoolAdapterState { /** - * Core size. + * Thread-pool keu + */ + private String threadPoolKey; + + /** + * Active + */ + private String active; + + /** + * identify + */ + private String identify; + + /** + * Client address + */ + private String clientAddress; + + /** + * Core size */ private Integer coreSize; /** - * Maximum size. + * Maximum size */ private Integer maximumSize; + + /** + * Blocking queue type + */ + private String blockingQueueType; + + /** + * Blocking queue capacity + */ + private Integer blockingQueueCapacity; } diff --git a/hippo4j-adapter/hippo4j-adapter-spring-cloud-stream-rocketmq/src/main/java/cn/hippo4j/adapter/springcloud/stream/rocketmq/SpringCloudStreamRocketMQThreadPoolAdapter.java b/hippo4j-adapter/hippo4j-adapter-spring-cloud-stream-rocketmq/src/main/java/cn/hippo4j/adapter/springcloud/stream/rocketmq/SpringCloudStreamRocketMQThreadPoolAdapter.java index 584c9a97..455d6596 100644 --- a/hippo4j-adapter/hippo4j-adapter-spring-cloud-stream-rocketmq/src/main/java/cn/hippo4j/adapter/springcloud/stream/rocketmq/SpringCloudStreamRocketMQThreadPoolAdapter.java +++ b/hippo4j-adapter/hippo4j-adapter-spring-cloud-stream-rocketmq/src/main/java/cn/hippo4j/adapter/springcloud/stream/rocketmq/SpringCloudStreamRocketMQThreadPoolAdapter.java @@ -25,6 +25,7 @@ import cn.hippo4j.common.toolkit.CollectionUtil; import cn.hippo4j.common.toolkit.ReflectUtil; import com.alibaba.cloud.stream.binder.rocketmq.consuming.RocketMQListenerBindingContainer; import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; @@ -37,12 +38,12 @@ import org.springframework.cloud.stream.binding.InputBindingLifecycle; import org.springframework.context.ApplicationListener; import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.ThreadPoolExecutor; import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMITER; -import static cn.hippo4j.common.constant.Constants.IDENTIFY_SLICER_SYMBOL; /** * Spring cloud stream rocketMQ thread-pool adapter. @@ -58,10 +59,12 @@ public class SpringCloudStreamRocketMQThreadPoolAdapter implements ThreadPoolAda } @Override - public ThreadPoolAdapterState getThreadPoolStateInfo(String identify) { + public ThreadPoolAdapterState getThreadPoolState(String identify) { ThreadPoolAdapterState result = new ThreadPoolAdapterState(); ThreadPoolExecutor rocketMQConsumeExecutor = ROCKET_MQ_SPRING_CLOUD_STREAM_CONSUME_EXECUTOR.get(identify); if (rocketMQConsumeExecutor != null) { + result.setThreadPoolKey(mark()); + result.setThreadPoolKey(identify); result.setCoreSize(rocketMQConsumeExecutor.getCorePoolSize()); result.setMaximumSize(rocketMQConsumeExecutor.getMaximumPoolSize()); return result; @@ -70,22 +73,30 @@ public class SpringCloudStreamRocketMQThreadPoolAdapter implements ThreadPoolAda return result; } + @Override + public List getThreadPoolStates() { + List adapterStateList = Lists.newArrayList(); + ROCKET_MQ_SPRING_CLOUD_STREAM_CONSUME_EXECUTOR.forEach( + (key, val) -> adapterStateList.add(getThreadPoolState(key))); + return adapterStateList; + } + @Override public boolean updateThreadPool(ThreadPoolAdapterParameter threadPoolAdapterParameter) { - String identify = threadPoolAdapterParameter.getIdentify(); - ThreadPoolExecutor rocketMQConsumeExecutor = ROCKET_MQ_SPRING_CLOUD_STREAM_CONSUME_EXECUTOR.get(identify); + String threadPoolKey = threadPoolAdapterParameter.getThreadPoolKey(); + ThreadPoolExecutor rocketMQConsumeExecutor = ROCKET_MQ_SPRING_CLOUD_STREAM_CONSUME_EXECUTOR.get(threadPoolKey); if (rocketMQConsumeExecutor != null) { int originalCoreSize = rocketMQConsumeExecutor.getCorePoolSize(); int originalMaximumPoolSize = rocketMQConsumeExecutor.getMaximumPoolSize(); rocketMQConsumeExecutor.setCorePoolSize(threadPoolAdapterParameter.getCoreSize()); rocketMQConsumeExecutor.setMaximumPoolSize(threadPoolAdapterParameter.getMaximumSize()); log.info("[{}] RocketMQ consumption thread pool parameter change. coreSize :: {}, maximumSize :: {}", - identify, + threadPoolKey, String.format(CHANGE_DELIMITER, originalCoreSize, rocketMQConsumeExecutor.getCorePoolSize()), String.format(CHANGE_DELIMITER, originalMaximumPoolSize, rocketMQConsumeExecutor.getMaximumPoolSize())); return true; } - log.warn("[{}] RocketMQ consuming thread pool not found.", identify); + log.warn("[{}] RocketMQ consuming thread pool not found.", threadPoolKey); return false; } @@ -99,7 +110,6 @@ public class SpringCloudStreamRocketMQThreadPoolAdapter implements ThreadPoolAda try { for (Binding each : inputBindings) { String bindingName = each.getBindingName(); - String buildKey = mark() + IDENTIFY_SLICER_SYMBOL + bindingName; DefaultBinding defaultBinding = (DefaultBinding) each; RocketMQInboundChannelAdapter lifecycle = (RocketMQInboundChannelAdapter) ReflectUtil.getFieldValue(defaultBinding, "lifecycle"); RocketMQListenerBindingContainer rocketMQListenerContainer = (RocketMQListenerBindingContainer) ReflectUtil.getFieldValue(lifecycle, "rocketMQListenerContainer"); @@ -107,7 +117,7 @@ public class SpringCloudStreamRocketMQThreadPoolAdapter implements ThreadPoolAda DefaultMQPushConsumerImpl defaultMQPushConsumerImpl = consumer.getDefaultMQPushConsumerImpl(); ConsumeMessageConcurrentlyService consumeMessageService = (ConsumeMessageConcurrentlyService) defaultMQPushConsumerImpl.getConsumeMessageService(); ThreadPoolExecutor consumeExecutor = (ThreadPoolExecutor) ReflectUtil.getFieldValue(consumeMessageService, "consumeExecutor"); - ROCKET_MQ_SPRING_CLOUD_STREAM_CONSUME_EXECUTOR.put(buildKey, consumeExecutor); + ROCKET_MQ_SPRING_CLOUD_STREAM_CONSUME_EXECUTOR.put(bindingName, consumeExecutor); } } catch (Exception ex) { log.error("Failed to get input-bindings thread pool.", ex); diff --git a/hippo4j-common/src/main/java/cn/hippo4j/common/constant/Constants.java b/hippo4j-common/src/main/java/cn/hippo4j/common/constant/Constants.java index f1c5cf1c..e8157b37 100644 --- a/hippo4j-common/src/main/java/cn/hippo4j/common/constant/Constants.java +++ b/hippo4j-common/src/main/java/cn/hippo4j/common/constant/Constants.java @@ -67,6 +67,10 @@ public class Constants { public static final String MONITOR_PATH = BASE_PATH + "/monitor"; + public static final String REGISTER_ADAPTER_BASE_PATH = BASE_PATH + "/adapter/thread-pool"; + + public static final String REGISTER_ADAPTER_PATH = REGISTER_ADAPTER_BASE_PATH + "/register"; + public static final String HEALTH_CHECK_PATH = BASE_PATH + "/health/check"; public static final String PROBE_MODIFY_REQUEST = "Listening-Configs"; diff --git a/hippo4j-config/pom.xml b/hippo4j-config/pom.xml index f7a18244..b7fa540f 100644 --- a/hippo4j-config/pom.xml +++ b/hippo4j-config/pom.xml @@ -64,5 +64,10 @@ cn.hippo4j log-record-tool + + + cn.hippo4j + hippo4j-adapter-base + diff --git a/hippo4j-config/src/main/java/cn/hippo4j/config/controller/ThreadPoolAdapterController.java b/hippo4j-config/src/main/java/cn/hippo4j/config/controller/ThreadPoolAdapterController.java new file mode 100644 index 00000000..112630e4 --- /dev/null +++ b/hippo4j-config/src/main/java/cn/hippo4j/config/controller/ThreadPoolAdapterController.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.config.controller; + +import cn.hippo4j.adapter.base.ThreadPoolAdapterCacheConfig; +import cn.hippo4j.common.web.base.Result; +import cn.hippo4j.common.web.base.Results; +import cn.hippo4j.config.service.ThreadPoolAdapterService; +import lombok.AllArgsConstructor; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RestController; + +import java.util.List; + +import static cn.hippo4j.common.constant.Constants.REGISTER_ADAPTER_PATH; + +/** + * Thread-pool adapter controller. + */ +@RestController +@AllArgsConstructor +public class ThreadPoolAdapterController { + + private final ThreadPoolAdapterService threadPoolAdapterService; + + @PostMapping(REGISTER_ADAPTER_PATH) + public Result registerAdapterThreadPool(@RequestBody List requestParameter) { + threadPoolAdapterService.register(requestParameter); + return Results.success(); + } +} diff --git a/hippo4j-config/src/main/java/cn/hippo4j/config/model/biz/adapter/ThreadPoolAdapterReqDTO.java b/hippo4j-config/src/main/java/cn/hippo4j/config/model/biz/adapter/ThreadPoolAdapterReqDTO.java new file mode 100644 index 00000000..19db8094 --- /dev/null +++ b/hippo4j-config/src/main/java/cn/hippo4j/config/model/biz/adapter/ThreadPoolAdapterReqDTO.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.config.model.biz.adapter; + +import lombok.Data; + +import java.util.List; + +/** + * Thread-pool adapter req DTO. + */ +@Data +public class ThreadPoolAdapterReqDTO { + + /** + * Mark + */ + private String mark; + + /** + * Tenant + */ + private String tenant; + + /** + * Item + */ + private String item; + + /** + * Thread pool key + */ + private String threadPoolKey; + + /** + * Identify + */ + private String identify; + + /** + * Core size + */ + private Integer coreSize; + + /** + * Maximum size + */ + private Integer maximumSize; + + /** + * Client address list + */ + private List clientAddressList; +} diff --git a/hippo4j-config/src/main/java/cn/hippo4j/config/model/biz/adapter/ThreadPoolAdapterRespDTO.java b/hippo4j-config/src/main/java/cn/hippo4j/config/model/biz/adapter/ThreadPoolAdapterRespDTO.java new file mode 100644 index 00000000..bf13e0d4 --- /dev/null +++ b/hippo4j-config/src/main/java/cn/hippo4j/config/model/biz/adapter/ThreadPoolAdapterRespDTO.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.config.model.biz.adapter; + +import lombok.Data; + +/** + * Thread-pool adapter resp DTO. + */ +@Data +public class ThreadPoolAdapterRespDTO { + + /** + * Identify + */ + private String identify; + + /** + * Active + */ + private String active; + + /** + * Client address + */ + private String clientAddress; + + /** + * Thread pool key + */ + private String threadPoolKey; + + /** + * Core size + */ + private Integer coreSize; + + /** + * Maximum size + */ + private Integer maximumSize; +} diff --git a/hippo4j-config/src/main/java/cn/hippo4j/config/service/LongPollingService.java b/hippo4j-config/src/main/java/cn/hippo4j/config/service/LongPollingService.java index 7a4b7e96..e9fa3e5b 100644 --- a/hippo4j-config/src/main/java/cn/hippo4j/config/service/LongPollingService.java +++ b/hippo4j-config/src/main/java/cn/hippo4j/config/service/LongPollingService.java @@ -114,7 +114,7 @@ public class LongPollingService { @Override public void run() { try { - for (Iterator iter = allSubs.iterator(); iter.hasNext(); ) { + for (Iterator iter = allSubs.iterator(); iter.hasNext();) { ClientLongPolling clientSub = iter.next(); String identity = groupKey + GROUP_KEY_DELIMITER + identify; List parseMapForFilter = Lists.newArrayList(identity); diff --git a/hippo4j-config/src/main/java/cn/hippo4j/config/service/ThreadPoolAdapterService.java b/hippo4j-config/src/main/java/cn/hippo4j/config/service/ThreadPoolAdapterService.java new file mode 100644 index 00000000..2e7127ba --- /dev/null +++ b/hippo4j-config/src/main/java/cn/hippo4j/config/service/ThreadPoolAdapterService.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.config.service; + +import cn.hippo4j.adapter.base.ThreadPoolAdapterCacheConfig; +import cn.hippo4j.adapter.base.ThreadPoolAdapterState; +import cn.hippo4j.common.toolkit.CollectionUtil; +import cn.hippo4j.common.toolkit.JSONUtil; +import cn.hippo4j.common.toolkit.StringUtil; +import cn.hippo4j.common.web.base.Result; +import cn.hippo4j.config.model.biz.adapter.ThreadPoolAdapterReqDTO; +import cn.hippo4j.config.model.biz.adapter.ThreadPoolAdapterRespDTO; +import cn.hutool.core.text.StrBuilder; +import cn.hutool.http.HttpUtil; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static cn.hippo4j.common.constant.Constants.HTTP_EXECUTE_TIMEOUT; +import static cn.hippo4j.common.constant.Constants.IDENTIFY_SLICER_SYMBOL; + +/** + * Thread-pool adapter service. + */ +@Slf4j +@Service +public class ThreadPoolAdapterService { + + /** + * Map>>> + */ + private final Map>>> THREAD_POOL_ADAPTER_MAP = Maps.newConcurrentMap(); + + public synchronized void register(List requestParameter) { + for (ThreadPoolAdapterCacheConfig each : requestParameter) { + String mark = each.getMark(); + Map>> actual = THREAD_POOL_ADAPTER_MAP.get(mark); + if (CollectionUtil.isEmpty(actual)) { + actual = Maps.newHashMap(); + THREAD_POOL_ADAPTER_MAP.put(mark, actual); + } + Map> tenantItemMap = actual.get(each.getTenantItemKey()); + if (CollectionUtil.isEmpty(tenantItemMap)) { + tenantItemMap = Maps.newHashMap(); + actual.put(each.getTenantItemKey(), tenantItemMap); + } + List threadPoolAdapterStates = each.getThreadPoolAdapterStates(); + for (ThreadPoolAdapterState adapterState : threadPoolAdapterStates) { + List threadPoolKeyList = tenantItemMap.get(adapterState.getThreadPoolKey()); + if (CollectionUtil.isEmpty(threadPoolKeyList)) { + threadPoolKeyList = Lists.newArrayList(); + tenantItemMap.put(adapterState.getThreadPoolKey(), threadPoolKeyList); + } + threadPoolKeyList.add(each.getClientAddress()); + } + } + } + + public List query(ThreadPoolAdapterReqDTO requestParameter) { + List actual = Optional.ofNullable(THREAD_POOL_ADAPTER_MAP.get(requestParameter.getMark())) + .map(each -> each.get(requestParameter.getTenant() + IDENTIFY_SLICER_SYMBOL + requestParameter.getItem())) + .map(each -> each.get(requestParameter.getThreadPoolKey())) + .orElse(Lists.newArrayList()); + List result = Lists.newCopyOnWriteArrayList(); + actual.parallelStream().forEach(each -> { + String urlString = StrBuilder.create("http://", each, "/adapter/thread-pool/info").toString(); + Map param = Maps.newHashMap(); + param.put("mark", requestParameter.getMark()); + param.put("threadPoolKey", requestParameter.getThreadPoolKey()); + try { + String resultStr = HttpUtil.get(urlString, param, HTTP_EXECUTE_TIMEOUT); + if (StringUtil.isNotBlank(resultStr)) { + Result restResult = JSONUtil.parseObject(resultStr, Result.class); + result.add(restResult.getData()); + } + } catch (Throwable ex) { + log.error("Failed to get third-party thread pool data.", ex); + } + }); + return result; + } +} diff --git a/hippo4j-console/src/main/java/cn/hippo4j/console/controller/ThreadPoolAdapterController.java b/hippo4j-console/src/main/java/cn/hippo4j/console/controller/ThreadPoolAdapterController.java new file mode 100644 index 00000000..8ec0b62c --- /dev/null +++ b/hippo4j-console/src/main/java/cn/hippo4j/console/controller/ThreadPoolAdapterController.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.console.controller; + +import cn.hippo4j.common.toolkit.JSONUtil; +import cn.hippo4j.common.web.base.Result; +import cn.hippo4j.common.web.base.Results; +import cn.hippo4j.config.model.biz.adapter.ThreadPoolAdapterReqDTO; +import cn.hippo4j.config.model.biz.adapter.ThreadPoolAdapterRespDTO; +import cn.hippo4j.config.service.ThreadPoolAdapterService; +import cn.hutool.core.text.StrBuilder; +import cn.hutool.http.HttpUtil; +import lombok.AllArgsConstructor; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RestController; + +import java.util.List; + +import static cn.hippo4j.common.constant.Constants.HTTP_EXECUTE_TIMEOUT; +import static cn.hippo4j.common.constant.Constants.REGISTER_ADAPTER_BASE_PATH; + +/** + * Thread-pool adapter controller. + */ +@AllArgsConstructor +@RestController("threadPoolAdapterConsoleController") +public class ThreadPoolAdapterController { + + private final ThreadPoolAdapterService threadPoolAdapterService; + + @GetMapping(REGISTER_ADAPTER_BASE_PATH + "/query") + public Result> queryAdapterThreadPool(ThreadPoolAdapterReqDTO requestParameter) { + List result = threadPoolAdapterService.query(requestParameter); + return Results.success(result); + } + + @PostMapping(REGISTER_ADAPTER_BASE_PATH + "/update") + public Result updateAdapterThreadPool(@RequestBody ThreadPoolAdapterReqDTO requestParameter) { + for (String each : requestParameter.getClientAddressList()) { + String urlString = StrBuilder.create("http://", each, "/adapter/thread-pool/update").toString(); + HttpUtil.post(urlString, JSONUtil.toJSONString(requestParameter), HTTP_EXECUTE_TIMEOUT); + } + return Results.success(); + } + +} diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java index ebac4dbb..c45c1f98 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java @@ -33,6 +33,7 @@ import cn.hippo4j.springboot.starter.controller.WebThreadPoolController; import cn.hippo4j.springboot.starter.controller.WebThreadPoolRunStateController; import cn.hippo4j.springboot.starter.core.*; import cn.hippo4j.springboot.starter.event.ApplicationContentPostProcessor; +import cn.hippo4j.springboot.starter.core.ThreadPoolAdapterRegister; import cn.hippo4j.springboot.starter.monitor.ReportingEventExecutor; import cn.hippo4j.springboot.starter.monitor.collect.RunTimeInfoCollector; import cn.hippo4j.springboot.starter.monitor.send.HttpConnectSender; @@ -139,8 +140,9 @@ public class DynamicThreadPoolAutoConfiguration { } @Bean - public ThreadPoolAdapterController threadPoolAdapterController() { - return new ThreadPoolAdapterController(); + @SuppressWarnings("all") + public ThreadPoolAdapterController threadPoolAdapterController(InetUtils hippo4JInetUtils) { + return new ThreadPoolAdapterController(environment, hippo4JInetUtils); } @Bean @@ -154,7 +156,14 @@ public class DynamicThreadPoolAutoConfiguration { } @Bean + @SuppressWarnings("all") public WebThreadPoolController webThreadPoolController(WebThreadPoolHandlerChoose webThreadPoolServiceChoose) { return new WebThreadPoolController(webThreadPoolServiceChoose); } + + @Bean + @SuppressWarnings("all") + public ThreadPoolAdapterRegister threadPoolAdapterRegister(HttpAgent httpAgent, InetUtils hippo4JInetUtils) { + return new ThreadPoolAdapterRegister(httpAgent, properties, environment, hippo4JInetUtils); + } } diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/controller/ThreadPoolAdapterController.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/controller/ThreadPoolAdapterController.java index fe42cbba..f96e5b06 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/controller/ThreadPoolAdapterController.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/controller/ThreadPoolAdapterController.java @@ -19,9 +19,16 @@ package cn.hippo4j.springboot.starter.controller; import cn.hippo4j.adapter.base.ThreadPoolAdapter; import cn.hippo4j.adapter.base.ThreadPoolAdapterParameter; +import cn.hippo4j.adapter.base.ThreadPoolAdapterState; import cn.hippo4j.common.web.base.Result; import cn.hippo4j.common.web.base.Results; +import cn.hippo4j.core.toolkit.IdentifyUtil; +import cn.hippo4j.core.toolkit.inet.InetUtils; +import cn.hippo4j.springboot.starter.toolkit.CloudCommonIdUtil; import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.core.env.ConfigurableEnvironment; +import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RestController; @@ -33,12 +40,34 @@ import static cn.hippo4j.adapter.base.ThreadPoolAdapterBeanContainer.THREAD_POOL /** * Thread-pool adapter controller. */ +@Slf4j @RestController @AllArgsConstructor public class ThreadPoolAdapterController { - @PostMapping("/update/adapter/thread-pool") + private final ConfigurableEnvironment environment; + + private final InetUtils hippo4JInetUtils; + + @GetMapping("/adapter/thread-pool/info") + public Result getAdapterThreadPool(ThreadPoolAdapterParameter requestParameter) { + ThreadPoolAdapter threadPoolAdapter = THREAD_POOL_ADAPTER_BEAN_CONTAINER.get(requestParameter.getMark()); + ThreadPoolAdapterState result = Optional.ofNullable(threadPoolAdapter).map(each -> { + ThreadPoolAdapterState threadPoolState = each.getThreadPoolState(requestParameter.getThreadPoolKey()); + String active = environment.getProperty("spring.profiles.active", "UNKNOWN"); + threadPoolState.setActive(active); + String clientAddress = CloudCommonIdUtil.getDefaultInstanceId(environment, hippo4JInetUtils); + threadPoolState.setClientAddress(clientAddress); + threadPoolState.setIdentify(IdentifyUtil.getIdentify()); + return threadPoolState; + }).orElse(null); + return Results.success(result); + } + + @PostMapping("/adapter/thread-pool/update") public Result updateAdapterThreadPool(@RequestBody ThreadPoolAdapterParameter requestParameter) { + log.info("[{}] Change third-party thread pool data. key: {}, coreSize: {}, maximumSize: {}", + requestParameter.getMark(), requestParameter.getThreadPoolKey(), requestParameter.getCoreSize(), requestParameter.getMaximumSize()); ThreadPoolAdapter threadPoolAdapter = THREAD_POOL_ADAPTER_BEAN_CONTAINER.get(requestParameter.getMark()); Optional.ofNullable(threadPoolAdapter).ifPresent(each -> each.updateThreadPool(requestParameter)); return Results.success(); diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ThreadPoolAdapterRegister.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ThreadPoolAdapterRegister.java new file mode 100644 index 00000000..64283b1e --- /dev/null +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ThreadPoolAdapterRegister.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.springboot.starter.core; + +import cn.hippo4j.adapter.base.ThreadPoolAdapter; +import cn.hippo4j.adapter.base.ThreadPoolAdapterCacheConfig; +import cn.hippo4j.common.config.ApplicationContextHolder; +import cn.hippo4j.common.toolkit.CollectionUtil; +import cn.hippo4j.common.web.base.Result; +import cn.hippo4j.core.toolkit.inet.InetUtils; +import cn.hippo4j.springboot.starter.config.BootstrapProperties; +import cn.hippo4j.springboot.starter.remote.HttpAgent; +import cn.hippo4j.springboot.starter.toolkit.CloudCommonIdUtil; +import com.google.common.collect.Lists; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.core.env.ConfigurableEnvironment; + +import java.util.List; +import java.util.Map; + +import static cn.hippo4j.common.constant.Constants.IDENTIFY_SLICER_SYMBOL; +import static cn.hippo4j.common.constant.Constants.REGISTER_ADAPTER_PATH; + +/** + * Thread-pool adapter register. + */ +@Slf4j +@AllArgsConstructor +public class ThreadPoolAdapterRegister implements ApplicationRunner { + + private final HttpAgent httpAgent; + + private final BootstrapProperties properties; + + private final ConfigurableEnvironment environment; + + private final InetUtils hippo4JInetUtils; + + @Override + public void run(ApplicationArguments args) throws Exception { + Map threadPoolAdapterMap = ApplicationContextHolder.getBeansOfType(ThreadPoolAdapter.class); + List cacheConfigList = Lists.newArrayList(); + threadPoolAdapterMap.forEach((key, val) -> { + ThreadPoolAdapterCacheConfig cacheConfig = new ThreadPoolAdapterCacheConfig(); + cacheConfig.setMark(val.mark()); + String tenantItemKey = properties.getNamespace() + IDENTIFY_SLICER_SYMBOL + properties.getItemId(); + cacheConfig.setTenantItemKey(tenantItemKey); + String clientAddress = CloudCommonIdUtil.getDefaultInstanceId(environment, hippo4JInetUtils); + cacheConfig.setClientAddress(clientAddress); + cacheConfig.setThreadPoolAdapterStates(val.getThreadPoolStates()); + cacheConfigList.add(cacheConfig); + }); + if (CollectionUtil.isNotEmpty(cacheConfigList)) { + try { + Result result = httpAgent.httpPost(REGISTER_ADAPTER_PATH, cacheConfigList); + if (!result.isSuccess()) { + log.warn("Failed to register third-party thread pool data."); + } + } catch (Throwable ex) { + log.error("Failed to register third-party thread pool data.", ex); + } + } + } +}