mirror of https://github.com/longtai-cn/hippo4j
feat: add support spring-cloud-stream-rabbitmq (#564)
parent
9a18879e9e
commit
0cfaf5d914
@ -0,0 +1,55 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>cn.hippo4j</groupId>
|
||||
<artifactId>hippo4j-adapter</artifactId>
|
||||
<version>${revision}</version>
|
||||
</parent>
|
||||
<artifactId>hippo4j-adapter-spring-cloud-stream-rabbitmq</artifactId>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>cn.hippo4j</groupId>
|
||||
<artifactId>hippo4j-adapter-base</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
|
||||
<version>${spring-cloud-starter-stream-rabbitmq.version}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-jar-plugin</artifactId>
|
||||
<configuration>
|
||||
<archive>
|
||||
<manifestEntries>
|
||||
<Implementation-Title>${project.artifactId}</Implementation-Title>
|
||||
<Implementation-Version>${project.version}</Implementation-Version>
|
||||
<Build-Time>${maven.build.timestamp}</Build-Time>
|
||||
<Built-By>chen.ma</Built-By>
|
||||
</manifestEntries>
|
||||
</archive>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-javadoc-plugin</artifactId>
|
||||
<version>2.10.3</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<goals>
|
||||
<goal>jar</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</project>
|
@ -0,0 +1,166 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package cn.hippo4j.adapter.springcloud.stream.rabbitmq;
|
||||
|
||||
import cn.hippo4j.adapter.base.ThreadPoolAdapter;
|
||||
import cn.hippo4j.adapter.base.ThreadPoolAdapterParameter;
|
||||
import cn.hippo4j.adapter.base.ThreadPoolAdapterState;
|
||||
import cn.hippo4j.common.config.ApplicationContextHolder;
|
||||
import cn.hippo4j.common.toolkit.CollectionUtil;
|
||||
import cn.hippo4j.common.toolkit.ReflectUtil;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
|
||||
import org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer;
|
||||
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
|
||||
import org.springframework.boot.context.event.ApplicationStartedEvent;
|
||||
import org.springframework.cloud.stream.binder.Binding;
|
||||
import org.springframework.cloud.stream.binder.DefaultBinding;
|
||||
import org.springframework.cloud.stream.binding.InputBindingLifecycle;
|
||||
import org.springframework.context.ApplicationListener;
|
||||
import org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMITER;
|
||||
|
||||
/**
|
||||
* Spring cloud stream rabbimq thread-pool adapter.
|
||||
*/
|
||||
@Slf4j
|
||||
public class SpringCloudStreamRabbitMQThreadPoolAdapter implements ThreadPoolAdapter, ApplicationListener<ApplicationStartedEvent> {
|
||||
|
||||
private final Map<String, AbstractMessageListenerContainer> ROCKET_MQ_SPRING_CLOUD_STREAM_CONSUME_EXECUTOR = Maps.newHashMap();
|
||||
|
||||
@Override
|
||||
public String mark() {
|
||||
return "rabbitMQSpringCloudStream";
|
||||
}
|
||||
|
||||
@Override
|
||||
public ThreadPoolAdapterState getThreadPoolState(String identify) {
|
||||
ThreadPoolAdapterState result = new ThreadPoolAdapterState();
|
||||
AbstractMessageListenerContainer messageListenerContainer = ROCKET_MQ_SPRING_CLOUD_STREAM_CONSUME_EXECUTOR.get(identify);
|
||||
if (messageListenerContainer != null) {
|
||||
result.setThreadPoolKey(identify);
|
||||
if (messageListenerContainer instanceof SimpleMessageListenerContainer) {
|
||||
int concurrentConsumers = (int) ReflectUtil.getFieldValue(messageListenerContainer, "concurrentConsumers");
|
||||
result.setCoreSize(concurrentConsumers);
|
||||
Object maxConcurrentConsumers = ReflectUtil.getFieldValue(messageListenerContainer, "maxConcurrentConsumers");
|
||||
if (maxConcurrentConsumers != null) {
|
||||
result.setMaximumSize((Integer) maxConcurrentConsumers);
|
||||
} else {
|
||||
result.setMaximumSize(concurrentConsumers);
|
||||
}
|
||||
|
||||
} else if (messageListenerContainer instanceof DirectMessageListenerContainer) {
|
||||
int consumersPerQueue = (int) ReflectUtil.getFieldValue(messageListenerContainer, "consumersPerQueue");
|
||||
result.setCoreSize(consumersPerQueue);
|
||||
result.setMaximumSize(consumersPerQueue);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
log.warn("[{}] rabbitMQ consuming thread pool not found.", identify);
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ThreadPoolAdapterState> getThreadPoolStates() {
|
||||
List<ThreadPoolAdapterState> adapterStateList = Lists.newArrayList();
|
||||
ROCKET_MQ_SPRING_CLOUD_STREAM_CONSUME_EXECUTOR.forEach(
|
||||
(key, val) -> adapterStateList.add(getThreadPoolState(key)));
|
||||
return adapterStateList;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean updateThreadPool(ThreadPoolAdapterParameter threadPoolAdapterParameter) {
|
||||
String threadPoolKey = threadPoolAdapterParameter.getThreadPoolKey();
|
||||
AbstractMessageListenerContainer messageListenerContainer = ROCKET_MQ_SPRING_CLOUD_STREAM_CONSUME_EXECUTOR.get(threadPoolKey);
|
||||
if (messageListenerContainer != null) {
|
||||
synchronized (ROCKET_MQ_SPRING_CLOUD_STREAM_CONSUME_EXECUTOR) {
|
||||
Integer corePoolSize = threadPoolAdapterParameter.getCorePoolSize();
|
||||
Integer maximumPoolSize = threadPoolAdapterParameter.getMaximumPoolSize();
|
||||
if (messageListenerContainer instanceof SimpleMessageListenerContainer) {
|
||||
int originalCoreSize = (int) ReflectUtil.getFieldValue(messageListenerContainer, "concurrentConsumers");
|
||||
Object maxConcurrentConsumers = ReflectUtil.getFieldValue(messageListenerContainer, "maxConcurrentConsumers");
|
||||
int originalMaximumPoolSize;
|
||||
if (maxConcurrentConsumers != null) {
|
||||
originalMaximumPoolSize = (Integer) maxConcurrentConsumers;
|
||||
} else {
|
||||
originalMaximumPoolSize = originalCoreSize;
|
||||
}
|
||||
SimpleMessageListenerContainer simpleMessageListenerContainer = (SimpleMessageListenerContainer) messageListenerContainer;
|
||||
if (originalCoreSize > maximumPoolSize) {
|
||||
simpleMessageListenerContainer.setConcurrentConsumers(corePoolSize);
|
||||
simpleMessageListenerContainer.setMaxConcurrentConsumers(maximumPoolSize);
|
||||
} else {
|
||||
simpleMessageListenerContainer.setMaxConcurrentConsumers(maximumPoolSize);
|
||||
simpleMessageListenerContainer.setConcurrentConsumers(corePoolSize);
|
||||
}
|
||||
log.info("[{}] rabbitMQ consumption thread pool parameter change. coreSize: {}, maximumSize: {}",
|
||||
threadPoolKey,
|
||||
String.format(CHANGE_DELIMITER, originalCoreSize, corePoolSize),
|
||||
String.format(CHANGE_DELIMITER, originalMaximumPoolSize, maximumPoolSize));
|
||||
} else if (messageListenerContainer instanceof DirectMessageListenerContainer) {
|
||||
int originalCoreSize = (int) ReflectUtil.getFieldValue(messageListenerContainer, "consumersPerQueue");
|
||||
DirectMessageListenerContainer directMessageListenerContainer = (DirectMessageListenerContainer) messageListenerContainer;
|
||||
directMessageListenerContainer.setConsumersPerQueue(maximumPoolSize);
|
||||
log.info("[{}] rabbitMQ consumption thread pool parameter change. coreSize: {}",
|
||||
threadPoolKey,
|
||||
String.format(CHANGE_DELIMITER, originalCoreSize, corePoolSize));
|
||||
} else {
|
||||
log.warn("[{}] rabbitMQ consuming thread pool not support. messageListenerContainer: {}", threadPoolKey, messageListenerContainer.getClass());
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
log.warn("[{}] rabbitMQ consuming thread pool not found.", threadPoolKey);
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onApplicationEvent(ApplicationStartedEvent event) {
|
||||
InputBindingLifecycle bindingLifecycle = ApplicationContextHolder.getBean(InputBindingLifecycle.class);
|
||||
Collection<Binding<Object>> inputBindings = Optional.ofNullable(ReflectUtil.getFieldValue(bindingLifecycle, "inputBindings"))
|
||||
.map(each -> (Collection<Binding<Object>>) each).orElse(null);
|
||||
|
||||
if (CollectionUtil.isEmpty(inputBindings)) {
|
||||
log.info("InputBindings record not found.");
|
||||
return;
|
||||
}
|
||||
try {
|
||||
for (Binding<Object> each : inputBindings) {
|
||||
String bindingName = each.getBindingName();
|
||||
DefaultBinding defaultBinding = (DefaultBinding) each;
|
||||
Object lifecycle = ReflectUtil.getFieldValue(defaultBinding, "lifecycle");
|
||||
if (lifecycle instanceof AmqpInboundChannelAdapter) {
|
||||
AbstractMessageListenerContainer rabbitMQListenerContainer = (AbstractMessageListenerContainer) ReflectUtil.getFieldValue(lifecycle, "messageListenerContainer");
|
||||
ROCKET_MQ_SPRING_CLOUD_STREAM_CONSUME_EXECUTOR.put(bindingName, rabbitMQListenerContainer);
|
||||
}
|
||||
}
|
||||
} catch (Exception ex) {
|
||||
log.error("Failed to get input-bindings thread pool.", ex);
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,78 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>cn.hippo4j</groupId>
|
||||
<artifactId>hippo4j-example</artifactId>
|
||||
<version>${revision}</version>
|
||||
</parent>
|
||||
<artifactId>hippo4j-spring-boot-starter-adapter-spring-cloud-stream-rabbitmq-example</artifactId>
|
||||
|
||||
<properties>
|
||||
<maven.deploy.skip>true</maven.deploy.skip>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-web</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-json</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.alibaba</groupId>
|
||||
<artifactId>fastjson</artifactId>
|
||||
<version>1.2.76</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.projectlombok</groupId>
|
||||
<artifactId>lombok</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>cn.hippo4j</groupId>
|
||||
<artifactId>hippo4j-spring-boot-starter-adapter-spring-cloud-stream-rabbitmq</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>cn.hippo4j</groupId>
|
||||
<artifactId>hippo4j-spring-boot-starter</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>cn.hippo4j</groupId>
|
||||
<artifactId>hippo4j-example-core</artifactId>
|
||||
<version>${revision}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<finalName>${project.artifactId}</finalName>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-maven-plugin</artifactId>
|
||||
<executions>
|
||||
<execution>
|
||||
<goals>
|
||||
<goal>repackage</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</project>
|
@ -0,0 +1,70 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package cn.hippo4j.springboot.starter.adapter.springcloud.stream.rabbitmq.example;
|
||||
|
||||
import cn.hippo4j.example.core.dto.SendMessageDTO;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.cloud.stream.function.StreamBridge;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.RequestParam;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
/**
|
||||
* Message produce.
|
||||
*/
|
||||
@Slf4j
|
||||
@RestController
|
||||
@AllArgsConstructor
|
||||
public class MessageProduce {
|
||||
|
||||
private final StreamBridge streamBridge;
|
||||
|
||||
@GetMapping("/message/send")
|
||||
public String sendMessage(@RequestParam(required = false) Integer maxSendSize) {
|
||||
if (maxSendSize == null) {
|
||||
maxSendSize = 10;
|
||||
}
|
||||
for (int i = 0; i < maxSendSize; i++) {
|
||||
sendMessage0();
|
||||
}
|
||||
return "success";
|
||||
}
|
||||
|
||||
private void sendMessage0() {
|
||||
String keys = UUID.randomUUID().toString();
|
||||
SendMessageDTO payload = SendMessageDTO.builder()
|
||||
.receiver("156011xxx91")
|
||||
.uid(keys)
|
||||
.build();
|
||||
long startTime = System.currentTimeMillis();
|
||||
boolean sendResult = false;
|
||||
try {
|
||||
sendResult = streamBridge.send("demoOutput", payload);
|
||||
} finally {
|
||||
log.info("Send status: {}, Keys: {}, Execute time: {} ms, Message: {}",
|
||||
sendResult,
|
||||
keys,
|
||||
System.currentTimeMillis() - startTime,
|
||||
JSON.toJSONString(payload));
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,52 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package cn.hippo4j.springboot.starter.adapter.springcloud.stream.rabbitmq.example;
|
||||
|
||||
import cn.hippo4j.core.enable.EnableDynamicThreadPool;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.support.AmqpHeaders;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
|
||||
import java.util.function.Consumer;
|
||||
|
||||
@Slf4j
|
||||
@EnableDynamicThreadPool
|
||||
@SpringBootApplication
|
||||
public class ServerAdapterSpringCloudStreamRabbitMQApplication {
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(ServerAdapterSpringCloudStreamRabbitMQApplication.class, args);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Consumer<Message<String>> demoInput() {
|
||||
return message -> {
|
||||
MessageHeaders headers = message.getHeaders();
|
||||
log.info("Input current thread name: {} ,{} received from partition {}",
|
||||
Thread.currentThread().getName(),
|
||||
JSON.toJSONString(message.getPayload()),
|
||||
headers.get(AmqpHeaders.CONSUMER_QUEUE));
|
||||
};
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,28 @@
|
||||
server.port=8090
|
||||
logging.level.org.springframework.amqp.
|
||||
|
||||
spring.profiles.active=dev
|
||||
spring.dynamic.thread-pool.server-addr=http://localhost:6691
|
||||
spring.dynamic.thread-pool.namespace=prescription
|
||||
spring.dynamic.thread-pool.item-id=dynamic-threadpool-example
|
||||
spring.dynamic.thread-pool.username=admin
|
||||
spring.dynamic.thread-pool.password=123456
|
||||
|
||||
spring.rabbitmq.host=localhost
|
||||
spring.rabbitmq.port=5672
|
||||
spring.rabbitmq.username=root
|
||||
spring.rabbitmq.password=123456
|
||||
spring.rabbitmq.virtual-host=/
|
||||
|
||||
# Please replace the local configuration.
|
||||
spring.cloud.stream.binders.defaultRabbit.type=rabbit
|
||||
|
||||
# === produce ===
|
||||
spring.cloud.stream.bindings.demoOutput.destination=exchange-demo
|
||||
spring.cloud.stream.bindings.demoOutput.content-type=application/json
|
||||
|
||||
# === consume ===
|
||||
spring.cloud.stream.bindings.demoInput-in-0.destination=exchange-demo
|
||||
spring.cloud.stream.bindings.demoInput-in-0.content-type=application/json
|
||||
spring.cloud.stream.bindings.demoInput-in-0.group=myGroup
|
||||
spring.cloud.stream.bindings.demoInput-in-0.consumer.concurrency=1
|
@ -0,0 +1,51 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>cn.hippo4j</groupId>
|
||||
<artifactId>hippo4j-spring-boot-starter-adapter</artifactId>
|
||||
<version>${revision}</version>
|
||||
</parent>
|
||||
<artifactId>hippo4j-spring-boot-starter-adapter-spring-cloud-stream-rabbitmq</artifactId>
|
||||
<name>hippo4j-spring-boot-starter-adapter-spring-cloud-stream-rabbitmq</name>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>cn.hippo4j</groupId>
|
||||
<artifactId>hippo4j-adapter-spring-cloud-stream-rabbitmq</artifactId>
|
||||
<version>${revision}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-jar-plugin</artifactId>
|
||||
<configuration>
|
||||
<archive>
|
||||
<manifestEntries>
|
||||
<Implementation-Title>${project.artifactId}</Implementation-Title>
|
||||
<Implementation-Version>${project.version}</Implementation-Version>
|
||||
<Build-Time>${maven.build.timestamp}</Build-Time>
|
||||
<Built-By>chen.ma</Built-By>
|
||||
</manifestEntries>
|
||||
</archive>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-javadoc-plugin</artifactId>
|
||||
<version>2.10.3</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<goals>
|
||||
<goal>jar</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</project>
|
@ -0,0 +1,50 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package cn.hippo4j.springboot.starter.adapter.springcloud.stream.rabbitmq;
|
||||
|
||||
import cn.hippo4j.adapter.springcloud.stream.rabbitmq.SpringCloudStreamRabbitMQThreadPoolAdapter;
|
||||
import cn.hippo4j.common.config.ApplicationContextHolder;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.cloud.stream.binder.rabbit.config.RabbitMessageChannelBinderConfiguration;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
/**
|
||||
* Spring cloud stream rabbitmq adapter auto configuration.
|
||||
*
|
||||
* @author lijianxin
|
||||
*/
|
||||
@Configuration(proxyBeanMethods = false)
|
||||
@ConditionalOnClass(RabbitMessageChannelBinderConfiguration.class)
|
||||
public class SpringCloudStreamRabbitMQAdapterAutoConfiguration {
|
||||
|
||||
@Bean
|
||||
@ConditionalOnMissingBean
|
||||
public ApplicationContextHolder simpleApplicationContextHolder() {
|
||||
return new ApplicationContextHolder();
|
||||
}
|
||||
|
||||
@Bean
|
||||
@SuppressWarnings("all")
|
||||
@ConditionalOnProperty("spring.rabbitmq.host")
|
||||
public SpringCloudStreamRabbitMQThreadPoolAdapter springCloudStreamRabbitMQThreadPoolAdapter(ApplicationContextHolder applicationContextHolder) {
|
||||
return new SpringCloudStreamRabbitMQThreadPoolAdapter();
|
||||
}
|
||||
}
|
@ -0,0 +1 @@
|
||||
org.springframework.boot.autoconfigure.EnableAutoConfiguration=cn.hippo4j.springboot.starter.adapter.springcloud.stream.rabbitmq.SpringCloudStreamRabbitMQAdapterAutoConfiguration
|
Loading…
Reference in new issue