From d58a7119effe92d63a311848d50f4dab8a2d4622 Mon Sep 17 00:00:00 2001
From: lijianxin <1064730540@qq.com>
Date: Thu, 25 Aug 2022 10:59:22 +0800
Subject: [PATCH] feat: add support spring-cloud-stream-rabbitmq
---
docs/docs/community/developer.md | 9 +-
.../pom.xml | 55 ++++++
...gCloudStreamRabbitMQThreadPoolAdapter.java | 166 ++++++++++++++++++
hippo4j-adapter/pom.xml | 1 +
.../pom.xml | 78 ++++++++
.../rabbitmq/example/MessageProduce.java | 70 ++++++++
...rSpringCloudStreamRabbitMQApplication.java | 52 ++++++
.../src/main/resources/application.properties | 28 +++
hippo4j-example/pom.xml | 1 +
.../pom.xml | 51 ++++++
...treamRabbitMQAdapterAutoConfiguration.java | 50 ++++++
.../main/resources/META-INF/spring.factories | 1 +
.../pom.xml | 1 +
pom.xml | 1 +
14 files changed, 563 insertions(+), 1 deletion(-)
create mode 100644 hippo4j-adapter/hippo4j-adapter-spring-cloud-stream-rabbitmq/pom.xml
create mode 100644 hippo4j-adapter/hippo4j-adapter-spring-cloud-stream-rabbitmq/src/main/java/cn/hippo4j/adapter/springcloud/stream/rabbitmq/SpringCloudStreamRabbitMQThreadPoolAdapter.java
create mode 100644 hippo4j-example/hippo4j-spring-boot-starter-adapter-spring-cloud-stream-rabbitmq-example/pom.xml
create mode 100644 hippo4j-example/hippo4j-spring-boot-starter-adapter-spring-cloud-stream-rabbitmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/springcloud/stream/rabbitmq/example/MessageProduce.java
create mode 100644 hippo4j-example/hippo4j-spring-boot-starter-adapter-spring-cloud-stream-rabbitmq-example/src/main/java/cn/hippo4j/springboot/starter/adapter/springcloud/stream/rabbitmq/example/ServerAdapterSpringCloudStreamRabbitMQApplication.java
create mode 100644 hippo4j-example/hippo4j-spring-boot-starter-adapter-spring-cloud-stream-rabbitmq-example/src/main/resources/application.properties
create mode 100644 hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/hippo4j-spring-boot-starter-adapter-spring-cloud-stream-rabbitmq/pom.xml
create mode 100644 hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/hippo4j-spring-boot-starter-adapter-spring-cloud-stream-rabbitmq/src/main/java/cn/hippo4j/springboot/starter/adapter/springcloud/stream/rabbitmq/SpringCloudStreamRabbitMQAdapterAutoConfiguration.java
create mode 100644 hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/hippo4j-spring-boot-starter-adapter-spring-cloud-stream-rabbitmq/src/main/resources/META-INF/spring.factories
diff --git a/docs/docs/community/developer.md b/docs/docs/community/developer.md
index 827427bc..bf813e09 100644
--- a/docs/docs/community/developer.md
+++ b/docs/docs/community/developer.md
@@ -45,5 +45,12 @@ sidebar_position: 2
核心开发者 |
weihubeats@163.com |
-
+
+  |
+ 李剑鑫 |
+ BigXin0109 |
+ Only丶Big |
+ 核心开发者 |
+ 1064730540@qq.com |
+
diff --git a/hippo4j-adapter/hippo4j-adapter-spring-cloud-stream-rabbitmq/pom.xml b/hippo4j-adapter/hippo4j-adapter-spring-cloud-stream-rabbitmq/pom.xml
new file mode 100644
index 00000000..d700d8a6
--- /dev/null
+++ b/hippo4j-adapter/hippo4j-adapter-spring-cloud-stream-rabbitmq/pom.xml
@@ -0,0 +1,55 @@
+
+
+ 4.0.0
+
+ cn.hippo4j
+ hippo4j-adapter
+ ${revision}
+
+ hippo4j-adapter-spring-cloud-stream-rabbitmq
+
+
+
+ cn.hippo4j
+ hippo4j-adapter-base
+
+
+
+ org.springframework.cloud
+ spring-cloud-starter-stream-rabbit
+ ${spring-cloud-starter-stream-rabbitmq.version}
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+
+
+
+ ${project.artifactId}
+ ${project.version}
+ ${maven.build.timestamp}
+ chen.ma
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-javadoc-plugin
+ 2.10.3
+
+
+
+ jar
+
+
+
+
+
+
+
diff --git a/hippo4j-adapter/hippo4j-adapter-spring-cloud-stream-rabbitmq/src/main/java/cn/hippo4j/adapter/springcloud/stream/rabbitmq/SpringCloudStreamRabbitMQThreadPoolAdapter.java b/hippo4j-adapter/hippo4j-adapter-spring-cloud-stream-rabbitmq/src/main/java/cn/hippo4j/adapter/springcloud/stream/rabbitmq/SpringCloudStreamRabbitMQThreadPoolAdapter.java
new file mode 100644
index 00000000..edabaedf
--- /dev/null
+++ b/hippo4j-adapter/hippo4j-adapter-spring-cloud-stream-rabbitmq/src/main/java/cn/hippo4j/adapter/springcloud/stream/rabbitmq/SpringCloudStreamRabbitMQThreadPoolAdapter.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.hippo4j.adapter.springcloud.stream.rabbitmq;
+
+import cn.hippo4j.adapter.base.ThreadPoolAdapter;
+import cn.hippo4j.adapter.base.ThreadPoolAdapterParameter;
+import cn.hippo4j.adapter.base.ThreadPoolAdapterState;
+import cn.hippo4j.common.config.ApplicationContextHolder;
+import cn.hippo4j.common.toolkit.CollectionUtil;
+import cn.hippo4j.common.toolkit.ReflectUtil;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
+import org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer;
+import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
+import org.springframework.boot.context.event.ApplicationStartedEvent;
+import org.springframework.cloud.stream.binder.Binding;
+import org.springframework.cloud.stream.binder.DefaultBinding;
+import org.springframework.cloud.stream.binding.InputBindingLifecycle;
+import org.springframework.context.ApplicationListener;
+import org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMITER;
+
+/**
+ * Spring cloud stream rabbimq thread-pool adapter.
+ */
+@Slf4j
+public class SpringCloudStreamRabbitMQThreadPoolAdapter implements ThreadPoolAdapter, ApplicationListener {
+
+ private final Map ROCKET_MQ_SPRING_CLOUD_STREAM_CONSUME_EXECUTOR = Maps.newHashMap();
+
+ @Override
+ public String mark() {
+ return "rabbitMQSpringCloudStream";
+ }
+
+ @Override
+ public ThreadPoolAdapterState getThreadPoolState(String identify) {
+ ThreadPoolAdapterState result = new ThreadPoolAdapterState();
+ AbstractMessageListenerContainer messageListenerContainer = ROCKET_MQ_SPRING_CLOUD_STREAM_CONSUME_EXECUTOR.get(identify);
+ if (messageListenerContainer != null) {
+ result.setThreadPoolKey(identify);
+ if (messageListenerContainer instanceof SimpleMessageListenerContainer) {
+ int concurrentConsumers = (int) ReflectUtil.getFieldValue(messageListenerContainer, "concurrentConsumers");
+ result.setCoreSize(concurrentConsumers);
+ Object maxConcurrentConsumers = ReflectUtil.getFieldValue(messageListenerContainer, "maxConcurrentConsumers");
+ if (maxConcurrentConsumers != null) {
+ result.setMaximumSize((Integer) maxConcurrentConsumers);
+ } else {
+ result.setMaximumSize(concurrentConsumers);
+ }
+
+ } else if (messageListenerContainer instanceof DirectMessageListenerContainer) {
+ int consumersPerQueue = (int) ReflectUtil.getFieldValue(messageListenerContainer, "consumersPerQueue");
+ result.setCoreSize(consumersPerQueue);
+ result.setMaximumSize(consumersPerQueue);
+ }
+ return result;
+ }
+ log.warn("[{}] rabbitMQ consuming thread pool not found.", identify);
+ return result;
+ }
+
+ @Override
+ public List getThreadPoolStates() {
+ List adapterStateList = Lists.newArrayList();
+ ROCKET_MQ_SPRING_CLOUD_STREAM_CONSUME_EXECUTOR.forEach(
+ (key, val) -> adapterStateList.add(getThreadPoolState(key)));
+ return adapterStateList;
+ }
+
+ @Override
+ public boolean updateThreadPool(ThreadPoolAdapterParameter threadPoolAdapterParameter) {
+ String threadPoolKey = threadPoolAdapterParameter.getThreadPoolKey();
+ AbstractMessageListenerContainer messageListenerContainer = ROCKET_MQ_SPRING_CLOUD_STREAM_CONSUME_EXECUTOR.get(threadPoolKey);
+ if (messageListenerContainer != null) {
+ synchronized (ROCKET_MQ_SPRING_CLOUD_STREAM_CONSUME_EXECUTOR) {
+ Integer corePoolSize = threadPoolAdapterParameter.getCorePoolSize();
+ Integer maximumPoolSize = threadPoolAdapterParameter.getMaximumPoolSize();
+ if (messageListenerContainer instanceof SimpleMessageListenerContainer) {
+ int originalCoreSize = (int) ReflectUtil.getFieldValue(messageListenerContainer, "concurrentConsumers");
+ Object maxConcurrentConsumers = ReflectUtil.getFieldValue(messageListenerContainer, "maxConcurrentConsumers");
+ int originalMaximumPoolSize;
+ if (maxConcurrentConsumers != null) {
+ originalMaximumPoolSize = (Integer) maxConcurrentConsumers;
+ } else {
+ originalMaximumPoolSize = originalCoreSize;
+ }
+ SimpleMessageListenerContainer simpleMessageListenerContainer = (SimpleMessageListenerContainer) messageListenerContainer;
+ if (originalCoreSize > maximumPoolSize) {
+ simpleMessageListenerContainer.setConcurrentConsumers(corePoolSize);
+ simpleMessageListenerContainer.setMaxConcurrentConsumers(maximumPoolSize);
+ } else {
+ simpleMessageListenerContainer.setMaxConcurrentConsumers(maximumPoolSize);
+ simpleMessageListenerContainer.setConcurrentConsumers(corePoolSize);
+ }
+ log.info("[{}] rabbitMQ consumption thread pool parameter change. coreSize: {}, maximumSize: {}",
+ threadPoolKey,
+ String.format(CHANGE_DELIMITER, originalCoreSize, corePoolSize),
+ String.format(CHANGE_DELIMITER, originalMaximumPoolSize, maximumPoolSize));
+ } else if (messageListenerContainer instanceof DirectMessageListenerContainer) {
+ int originalCoreSize = (int) ReflectUtil.getFieldValue(messageListenerContainer, "consumersPerQueue");
+ DirectMessageListenerContainer directMessageListenerContainer = (DirectMessageListenerContainer) messageListenerContainer;
+ directMessageListenerContainer.setConsumersPerQueue(maximumPoolSize);
+ log.info("[{}] rabbitMQ consumption thread pool parameter change. coreSize: {}",
+ threadPoolKey,
+ String.format(CHANGE_DELIMITER, originalCoreSize, corePoolSize));
+ } else {
+ log.warn("[{}] rabbitMQ consuming thread pool not support. messageListenerContainer: {}", threadPoolKey, messageListenerContainer.getClass());
+ return false;
+ }
+ }
+
+ return true;
+ }
+ log.warn("[{}] rabbitMQ consuming thread pool not found.", threadPoolKey);
+ return false;
+ }
+
+ @Override
+ public void onApplicationEvent(ApplicationStartedEvent event) {
+ InputBindingLifecycle bindingLifecycle = ApplicationContextHolder.getBean(InputBindingLifecycle.class);
+ Collection> inputBindings = Optional.ofNullable(ReflectUtil.getFieldValue(bindingLifecycle, "inputBindings"))
+ .map(each -> (Collection>) each).orElse(null);
+
+ if (CollectionUtil.isEmpty(inputBindings)) {
+ log.info("InputBindings record not found.");
+ return;
+ }
+ try {
+ for (Binding