diff --git a/hippo4j-adapter/hippo4j-adapter-base/pom.xml b/hippo4j-adapter/hippo4j-adapter-base/pom.xml
index 9b3a5c85..96886366 100644
--- a/hippo4j-adapter/hippo4j-adapter-base/pom.xml
+++ b/hippo4j-adapter/hippo4j-adapter-base/pom.xml
@@ -16,4 +16,36 @@
hippo4j-common
+
+
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+
+
+
+ ${project.artifactId}
+ ${project.version}
+ ${maven.build.timestamp}
+ chen.ma
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-javadoc-plugin
+ 2.10.3
+
+
+
+ jar
+
+
+
+
+
+
diff --git a/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapter.java b/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapter.java
index 85369659..ce7b3b4f 100644
--- a/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapter.java
+++ b/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapter.java
@@ -40,9 +40,8 @@ public interface ThreadPoolAdapter {
/**
* Modify the core parameters of the framework thread pool.
*
- * @param identify {@link ThreadPoolAdapter#mark} + Thread pool unique id
* @param threadPoolAdapterParameter Thread pool parameters to be modified
* @return
*/
- boolean updateThreadPool(String identify, ThreadPoolAdapterParameter threadPoolAdapterParameter);
+ boolean updateThreadPool(ThreadPoolAdapterParameter threadPoolAdapterParameter);
}
diff --git a/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterBeanContainer.java b/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterBeanContainer.java
new file mode 100644
index 00000000..0688192d
--- /dev/null
+++ b/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterBeanContainer.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.hippo4j.adapter.base;
+
+import cn.hippo4j.common.config.ApplicationContextHolder;
+import com.google.common.collect.Maps;
+import org.springframework.beans.factory.InitializingBean;
+
+import java.util.Map;
+
+/**
+ * Thread-pool adapter bean container.
+ */
+public class ThreadPoolAdapterBeanContainer implements InitializingBean {
+
+ /**
+ * Store three-party thread pool framework bean instances.
+ */
+ public static final Map THREAD_POOL_ADAPTER_BEAN_CONTAINER = Maps.newConcurrentMap();
+
+ @Override
+ public void afterPropertiesSet() throws Exception {
+ Map threadPoolAdapterMap = ApplicationContextHolder.getBeansOfType(ThreadPoolAdapter.class);
+ threadPoolAdapterMap.forEach((key, val) -> THREAD_POOL_ADAPTER_BEAN_CONTAINER.put(val.mark(), val));
+ }
+}
diff --git a/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterParameter.java b/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterParameter.java
index b097a136..056a1ba8 100644
--- a/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterParameter.java
+++ b/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterParameter.java
@@ -25,6 +25,16 @@ import lombok.Data;
@Data
public class ThreadPoolAdapterParameter {
+ /**
+ * mark
+ */
+ private String mark;
+
+ /**
+ * identify
+ */
+ private String identify;
+
/**
* Core size.
*/
diff --git a/hippo4j-adapter/hippo4j-adapter-spring-cloud-stream-rocketmq/pom.xml b/hippo4j-adapter/hippo4j-adapter-spring-cloud-stream-rocketmq/pom.xml
index 9b6bbe28..c22e390c 100644
--- a/hippo4j-adapter/hippo4j-adapter-spring-cloud-stream-rocketmq/pom.xml
+++ b/hippo4j-adapter/hippo4j-adapter-spring-cloud-stream-rocketmq/pom.xml
@@ -15,5 +15,43 @@
cn.hippo4j
hippo4j-adapter-base
+
+
+ com.alibaba.cloud
+ spring-cloud-starter-stream-rocketmq
+ ${spring-cloud-starter-stream-rocketmq.version}
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+
+
+
+ ${project.artifactId}
+ ${project.version}
+ ${maven.build.timestamp}
+ chen.ma
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-javadoc-plugin
+ 2.10.3
+
+
+
+ jar
+
+
+
+
+
+
diff --git a/hippo4j-adapter/hippo4j-adapter-spring-cloud-stream-rocketmq/src/main/java/cn/hippo4j/adapter/springcloud/stream/rocketmq/SpringCloudStreamRocketMQThreadPoolAdapter.java b/hippo4j-adapter/hippo4j-adapter-spring-cloud-stream-rocketmq/src/main/java/cn/hippo4j/adapter/springcloud/stream/rocketmq/SpringCloudStreamRocketMQThreadPoolAdapter.java
index 1d000665..0a8400f7 100644
--- a/hippo4j-adapter/hippo4j-adapter-spring-cloud-stream-rocketmq/src/main/java/cn/hippo4j/adapter/springcloud/stream/rocketmq/SpringCloudStreamRocketMQThreadPoolAdapter.java
+++ b/hippo4j-adapter/hippo4j-adapter-spring-cloud-stream-rocketmq/src/main/java/cn/hippo4j/adapter/springcloud/stream/rocketmq/SpringCloudStreamRocketMQThreadPoolAdapter.java
@@ -20,15 +20,29 @@ package cn.hippo4j.adapter.springcloud.stream.rocketmq;
import cn.hippo4j.adapter.base.ThreadPoolAdapter;
import cn.hippo4j.adapter.base.ThreadPoolAdapterParameter;
import cn.hippo4j.adapter.base.ThreadPoolAdapterState;
+import cn.hippo4j.common.config.ApplicationContextHolder;
+import cn.hippo4j.common.toolkit.CollectionUtil;
+import cn.hippo4j.common.toolkit.ReflectUtil;
+import com.alibaba.cloud.stream.binder.rocketmq.consuming.RocketMQListenerBindingContainer;
+import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService;
+import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
import org.springframework.boot.context.event.ApplicationStartedEvent;
+import org.springframework.cloud.stream.binder.Binding;
+import org.springframework.cloud.stream.binder.DefaultBinding;
+import org.springframework.cloud.stream.binding.InputBindingLifecycle;
import org.springframework.context.ApplicationListener;
+import java.util.Collection;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.ThreadPoolExecutor;
import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMITER;
+import static cn.hippo4j.common.constant.Constants.IDENTIFY_SLICER_SYMBOL;
/**
* Spring cloud stream rocketMQ thread-pool adapter.
@@ -40,7 +54,7 @@ public class SpringCloudStreamRocketMQThreadPoolAdapter implements ThreadPoolAda
@Override
public String mark() {
- return "RocketMQ_SpringCloud_Stream";
+ return "RocketMQSpringCloudStream";
}
@Override
@@ -57,7 +71,8 @@ public class SpringCloudStreamRocketMQThreadPoolAdapter implements ThreadPoolAda
}
@Override
- public boolean updateThreadPool(String identify, ThreadPoolAdapterParameter threadPoolAdapterParameter) {
+ public boolean updateThreadPool(ThreadPoolAdapterParameter threadPoolAdapterParameter) {
+ String identify = threadPoolAdapterParameter.getIdentify();
ThreadPoolExecutor rocketMQConsumeExecutor = ROCKET_MQ_SPRING_CLOUD_STREAM_CONSUME_EXECUTOR.get(identify);
if (rocketMQConsumeExecutor != null) {
int originalCoreSize = rocketMQConsumeExecutor.getCorePoolSize();
@@ -76,6 +91,22 @@ public class SpringCloudStreamRocketMQThreadPoolAdapter implements ThreadPoolAda
@Override
public void onApplicationEvent(ApplicationStartedEvent event) {
- // TODO Get rocketMQ consumer thread pool collection
+ InputBindingLifecycle bindingLifecycle = ApplicationContextHolder.getBean(InputBindingLifecycle.class);
+ Collection> inputBindings = Optional.ofNullable(ReflectUtil.getFieldValue(bindingLifecycle, "inputBindings")).map(each -> (Collection>) each).orElse(null);
+ if (CollectionUtil.isEmpty(inputBindings)) {
+ log.info("InputBindings record not found.");
+ }
+ for (Binding