Add spring cloud stream rocketmq consumption thread pool monitoring and dynamic change (#215)

pull/233/head
chen.ma 2 years ago
parent eddec7b8b2
commit 132d25fa8a

@ -16,4 +16,36 @@
<artifactId>hippo4j-common</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifestEntries>
<Implementation-Title>${project.artifactId}</Implementation-Title>
<Implementation-Version>${project.version}</Implementation-Version>
<Build-Time>${maven.build.timestamp}</Build-Time>
<Built-By>chen.ma</Built-By>
</manifestEntries>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>2.10.3</version>
<executions>
<execution>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

@ -40,9 +40,8 @@ public interface ThreadPoolAdapter {
/**
* Modify the core parameters of the framework thread pool.
*
* @param identify {@link ThreadPoolAdapter#mark} + Thread pool unique id
* @param threadPoolAdapterParameter Thread pool parameters to be modified
* @return
*/
boolean updateThreadPool(String identify, ThreadPoolAdapterParameter threadPoolAdapterParameter);
boolean updateThreadPool(ThreadPoolAdapterParameter threadPoolAdapterParameter);
}

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.adapter.base;
import cn.hippo4j.common.config.ApplicationContextHolder;
import com.google.common.collect.Maps;
import org.springframework.beans.factory.InitializingBean;
import java.util.Map;
/**
* Thread-pool adapter bean container.
*/
public class ThreadPoolAdapterBeanContainer implements InitializingBean {
/**
* Store three-party thread pool framework bean instances.
*/
public static final Map<String, ThreadPoolAdapter> THREAD_POOL_ADAPTER_BEAN_CONTAINER = Maps.newConcurrentMap();
@Override
public void afterPropertiesSet() throws Exception {
Map<String, ThreadPoolAdapter> threadPoolAdapterMap = ApplicationContextHolder.getBeansOfType(ThreadPoolAdapter.class);
threadPoolAdapterMap.forEach((key, val) -> THREAD_POOL_ADAPTER_BEAN_CONTAINER.put(val.mark(), val));
}
}

@ -25,6 +25,16 @@ import lombok.Data;
@Data
public class ThreadPoolAdapterParameter {
/**
* mark
*/
private String mark;
/**
* identify
*/
private String identify;
/**
* Core size.
*/

@ -15,5 +15,43 @@
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-adapter-base</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
<version>${spring-cloud-starter-stream-rocketmq.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifestEntries>
<Implementation-Title>${project.artifactId}</Implementation-Title>
<Implementation-Version>${project.version}</Implementation-Version>
<Build-Time>${maven.build.timestamp}</Build-Time>
<Built-By>chen.ma</Built-By>
</manifestEntries>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>2.10.3</version>
<executions>
<execution>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

@ -20,15 +20,29 @@ package cn.hippo4j.adapter.springcloud.stream.rocketmq;
import cn.hippo4j.adapter.base.ThreadPoolAdapter;
import cn.hippo4j.adapter.base.ThreadPoolAdapterParameter;
import cn.hippo4j.adapter.base.ThreadPoolAdapterState;
import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.toolkit.CollectionUtil;
import cn.hippo4j.common.toolkit.ReflectUtil;
import com.alibaba.cloud.stream.binder.rocketmq.consuming.RocketMQListenerBindingContainer;
import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.cloud.stream.binder.Binding;
import org.springframework.cloud.stream.binder.DefaultBinding;
import org.springframework.cloud.stream.binding.InputBindingLifecycle;
import org.springframework.context.ApplicationListener;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ThreadPoolExecutor;
import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMITER;
import static cn.hippo4j.common.constant.Constants.IDENTIFY_SLICER_SYMBOL;
/**
* Spring cloud stream rocketMQ thread-pool adapter.
@ -40,7 +54,7 @@ public class SpringCloudStreamRocketMQThreadPoolAdapter implements ThreadPoolAda
@Override
public String mark() {
return "RocketMQ_SpringCloud_Stream";
return "RocketMQSpringCloudStream";
}
@Override
@ -57,7 +71,8 @@ public class SpringCloudStreamRocketMQThreadPoolAdapter implements ThreadPoolAda
}
@Override
public boolean updateThreadPool(String identify, ThreadPoolAdapterParameter threadPoolAdapterParameter) {
public boolean updateThreadPool(ThreadPoolAdapterParameter threadPoolAdapterParameter) {
String identify = threadPoolAdapterParameter.getIdentify();
ThreadPoolExecutor rocketMQConsumeExecutor = ROCKET_MQ_SPRING_CLOUD_STREAM_CONSUME_EXECUTOR.get(identify);
if (rocketMQConsumeExecutor != null) {
int originalCoreSize = rocketMQConsumeExecutor.getCorePoolSize();
@ -76,6 +91,22 @@ public class SpringCloudStreamRocketMQThreadPoolAdapter implements ThreadPoolAda
@Override
public void onApplicationEvent(ApplicationStartedEvent event) {
// TODO Get rocketMQ consumer thread pool collection
InputBindingLifecycle bindingLifecycle = ApplicationContextHolder.getBean(InputBindingLifecycle.class);
Collection<Binding<Object>> inputBindings = Optional.ofNullable(ReflectUtil.getFieldValue(bindingLifecycle, "inputBindings")).map(each -> (Collection<Binding<Object>>) each).orElse(null);
if (CollectionUtil.isEmpty(inputBindings)) {
log.info("InputBindings record not found.");
}
for (Binding<Object> each : inputBindings) {
String bindingName = each.getBindingName();
String buildKey = mark() + IDENTIFY_SLICER_SYMBOL + bindingName;
DefaultBinding defaultBinding = (DefaultBinding) each;
RocketMQInboundChannelAdapter lifecycle = (RocketMQInboundChannelAdapter) cn.hutool.core.util.ReflectUtil.getFieldValue(defaultBinding, "lifecycle");
RocketMQListenerBindingContainer rocketMQListenerContainer = (RocketMQListenerBindingContainer) cn.hutool.core.util.ReflectUtil.getFieldValue(lifecycle, "rocketMQListenerContainer");
DefaultMQPushConsumer consumer = rocketMQListenerContainer.getConsumer();
DefaultMQPushConsumerImpl defaultMQPushConsumerImpl = consumer.getDefaultMQPushConsumerImpl();
ConsumeMessageConcurrentlyService consumeMessageService = (ConsumeMessageConcurrentlyService) defaultMQPushConsumerImpl.getConsumeMessageService();
ThreadPoolExecutor consumeExecutor = (ThreadPoolExecutor) cn.hutool.core.util.ReflectUtil.getFieldValue(consumeMessageService, "consumeExecutor");
ROCKET_MQ_SPRING_CLOUD_STREAM_CONSUME_EXECUTOR.put(buildKey, consumeExecutor);
}
}
}

@ -18,6 +18,7 @@
package cn.hippo4j.config.config;
import cn.hippo4j.common.config.ApplicationContextHolder;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
@ -35,7 +36,8 @@ import static cn.hippo4j.common.constant.Constants.AVAILABLE_PROCESSORS;
public class CommonConfig {
@Bean
public ApplicationContextHolder simpleApplicationContextHolder() {
@ConditionalOnMissingBean
public ApplicationContextHolder hippo4JApplicationContextHolder() {
return new ApplicationContextHolder();
}

@ -45,10 +45,7 @@ import cn.hippo4j.core.springboot.starter.refresher.event.WebExecutorListener;
import cn.hippo4j.core.springboot.starter.support.DynamicThreadPoolPostProcessor;
import lombok.AllArgsConstructor;
import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.condition.*;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@ -80,6 +77,7 @@ public class DynamicThreadPoolCoreAutoConfiguration {
private static final String ZK_CONFIG_KEY = "org.apache.curator.framework.CuratorFramework";
@Bean
@ConditionalOnMissingBean
@Order(Ordered.HIGHEST_PRECEDENCE)
public ApplicationContextHolder hippo4JApplicationContextHolder() {
return new ApplicationContextHolder();

@ -7,7 +7,6 @@
<artifactId>hippo4j-spring-boot-starter-adapter</artifactId>
<version>${revision}</version>
</parent>
<artifactId>hippo4j-spring-boot-starter-adapter-spring-cloud-stream-rocketmq</artifactId>
<name>hippo4j-spring-boot-starter-adapter-spring-cloud-stream-rocketmq</name>

@ -18,6 +18,8 @@
package cn.hippo4j.springboot.starter.adapter.springcloud.stream.rocketmq;
import cn.hippo4j.adapter.springcloud.stream.rocketmq.SpringCloudStreamRocketMQThreadPoolAdapter;
import cn.hippo4j.common.config.ApplicationContextHolder;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@ -29,8 +31,15 @@ import org.springframework.context.annotation.Configuration;
public class SpringCloudStreamRocketMQAutoConfiguration {
@Bean
@ConditionalOnMissingBean
public ApplicationContextHolder simpleApplicationContextHolder() {
return new ApplicationContextHolder();
}
@Bean
@SuppressWarnings("all")
@ConditionalOnProperty(name = "spring.cloud.stream.rocketmq.binder.name-server")
public SpringCloudStreamRocketMQThreadPoolAdapter springCloudStreamRocketMQThreadPoolAdapter() {
public SpringCloudStreamRocketMQThreadPoolAdapter springCloudStreamRocketMQThreadPoolAdapter(ApplicationContextHolder applicationContextHolder) {
return new SpringCloudStreamRocketMQThreadPoolAdapter();
}
}

@ -9,9 +9,6 @@
</parent>
<artifactId>hippo4j-spring-boot-starter</artifactId>
<packaging>jar</packaging>
<name>${project.artifactId}</name>
<description>Thread pool dynamic parameter adjustment, alarming, status viewing and monitoring functions</description>
<dependencies>
@ -84,6 +81,11 @@
<scope>compile</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-adapter-base</artifactId>
</dependency>
</dependencies>
<build>

@ -17,6 +17,7 @@
package cn.hippo4j.springboot.starter.config;
import cn.hippo4j.adapter.base.ThreadPoolAdapterBeanContainer;
import cn.hippo4j.common.api.ThreadDetailState;
import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.core.config.UtilAutoConfiguration;
@ -27,8 +28,9 @@ import cn.hippo4j.core.executor.web.WebThreadPoolHandlerChoose;
import cn.hippo4j.core.handler.DynamicThreadPoolBannerHandler;
import cn.hippo4j.core.toolkit.IdentifyUtil;
import cn.hippo4j.core.toolkit.inet.InetUtils;
import cn.hippo4j.springboot.starter.controller.WebThreadPoolRunStateController;
import cn.hippo4j.springboot.starter.controller.ThreadPoolAdapterController;
import cn.hippo4j.springboot.starter.controller.WebThreadPoolController;
import cn.hippo4j.springboot.starter.controller.WebThreadPoolRunStateController;
import cn.hippo4j.springboot.starter.core.*;
import cn.hippo4j.springboot.starter.event.ApplicationContentPostProcessor;
import cn.hippo4j.springboot.starter.monitor.ReportingEventExecutor;
@ -74,6 +76,7 @@ public class DynamicThreadPoolAutoConfiguration {
}
@Bean
@ConditionalOnMissingBean
@Order(Ordered.HIGHEST_PRECEDENCE)
public ApplicationContextHolder hippo4JApplicationContextHolder() {
return new ApplicationContextHolder();
@ -135,6 +138,16 @@ public class DynamicThreadPoolAutoConfiguration {
return new RunTimeInfoCollector(properties);
}
@Bean
public ThreadPoolAdapterController threadPoolAdapterController() {
return new ThreadPoolAdapterController();
}
@Bean
public ThreadPoolAdapterBeanContainer threadPoolAdapterBeanContainer() {
return new ThreadPoolAdapterBeanContainer();
}
@Bean
public ApplicationContentPostProcessor applicationContentPostProcessor() {
return new ApplicationContentPostProcessor();

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.springboot.starter.controller;
import cn.hippo4j.adapter.base.ThreadPoolAdapter;
import cn.hippo4j.adapter.base.ThreadPoolAdapterParameter;
import cn.hippo4j.common.web.base.Result;
import cn.hippo4j.common.web.base.Results;
import lombok.AllArgsConstructor;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import java.util.Optional;
import static cn.hippo4j.adapter.base.ThreadPoolAdapterBeanContainer.THREAD_POOL_ADAPTER_BEAN_CONTAINER;
/**
* Thread-pool adapter controller.
*/
@RestController
@AllArgsConstructor
public class ThreadPoolAdapterController {
@PostMapping("/update/adapter/thread-pool")
public Result<Void> updateAdapterThreadPool(@RequestBody ThreadPoolAdapterParameter requestParameter) {
ThreadPoolAdapter threadPoolAdapter = THREAD_POOL_ADAPTER_BEAN_CONTAINER.get(requestParameter.getMark());
Optional.ofNullable(threadPoolAdapter).ifPresent(each -> each.updateThreadPool(requestParameter));
return Results.success();
}
}
Loading…
Cancel
Save