Merge pull request #248 from weihubeats/feature_rabbitmq

Feature rabbitmq
pull/253/head
Long Tai 2 years ago committed by GitHub
commit 72fc6e5656
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -21,6 +21,7 @@ import cn.hippo4j.adapter.base.ThreadPoolAdapter;
import cn.hippo4j.adapter.base.ThreadPoolAdapterParameter;
import cn.hippo4j.adapter.base.ThreadPoolAdapterState;
import cn.hippo4j.common.toolkit.ReflectUtil;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@ -77,6 +78,15 @@ public class RabbitMQThreadPoolAdapter implements ThreadPoolAdapter, Application
}
return threadPoolAdapterState;
}
public List<ThreadPoolAdapterState> getThreadPoolStates() {
List<ThreadPoolAdapterState> adapterStateList = Lists.newArrayList();
RABBITMQ_EXECUTOR.forEach(
(key, val) -> adapterStateList.add(getThreadPoolState(key)));
RABBITMQ_THREAD_POOL_TASK_EXECUTOR.forEach(
(key, val) -> adapterStateList.add(getThreadPoolState(key)));
return adapterStateList;
}
@Override
public boolean updateThreadPool(ThreadPoolAdapterParameter threadPoolAdapterParameter) {
@ -116,17 +126,18 @@ public class RabbitMQThreadPoolAdapter implements ThreadPoolAdapter, Application
// 优先获取用户配置的
AbstractMessageListenerContainer listenerContainer1 = consumerWorkService.createListenerContainer();
SimpleAsyncTaskExecutor fieldValue = (SimpleAsyncTaskExecutor) ReflectUtil.getFieldValue(listenerContainer1, FiledName);
log.info("rabbitmq executor name {}", FiledName);
RABBITMQ_EXECUTOR.put(FiledName, fieldValue);
} else {
if (executor instanceof ThreadPoolTaskExecutor) {
ThreadPoolTaskExecutor threadPoolTaskExecutor = (ThreadPoolTaskExecutor) executor;
String beanName = (String) ReflectUtil.getFieldValue(threadPoolTaskExecutor, BEAN_NAME_FILED);
RABBITMQ_THREAD_POOL_TASK_EXECUTOR.put(beanName, threadPoolTaskExecutor);
log.info("rabbitmq executor name {}", beanName);
} else {
log.warn("Custom thread pools only support ThreadPoolTaskExecutor");
}
}
}
}
}

@ -15,7 +15,7 @@
* limitations under the License.
*/
package cn.hippo4j.springboot.starter.adapter.rabbitmq.example;
package cn.hippo4j.springboot.starter.adapter.rabbitmq.example.config;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;

@ -39,7 +39,7 @@ public class RabbitMQThreadPoolConfig {
// 指定线程的最大数量
executor.setMaxPoolSize(10);
// 指定线程池维护线程的最少数量
executor.setCorePoolSize(2);
executor.setCorePoolSize(10);
// 指定等待处理的任务数
executor.setQueueCapacity(20);
executor.setThreadNamePrefix("RabbitListenerTaskExecutor-");
@ -52,8 +52,8 @@ public class RabbitMQThreadPoolConfig {
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
factory.setConcurrentConsumers(1);
factory.setMaxConcurrentConsumers(10);
factory.setConcurrentConsumers(2);
factory.setMaxConcurrentConsumers(2);
factory.setTaskExecutor(rabbitListenerTaskExecutor);
return factory;
}

@ -15,12 +15,12 @@
* limitations under the License.
*/
package cn.hippo4j.springboot.starter.adapter.rabbitmq.example;
package cn.hippo4j.springboot.starter.adapter.rabbitmq.example.constants;
/**
* Simple MQ constant.
*/
public class SimpleMQConstant {
public interface SimpleMQConstant {
public static final String QUEUE_NAME = "framework_message-center_queue";
String QUEUE_NAME = "framework_message-center_queue";
}

@ -15,10 +15,10 @@
* limitations under the License.
*/
package cn.hippo4j.springboot.starter.adapter.rabbitmq.example;
package cn.hippo4j.springboot.starter.adapter.rabbitmq.example.consumer;
import cn.hippo4j.example.core.dto.SendMessageDTO;
import com.fasterxml.jackson.core.JsonProcessingException;
import cn.hippo4j.springboot.starter.adapter.rabbitmq.example.constants.SimpleMQConstant;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Queue;
@ -26,18 +26,30 @@ import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
* Message consume.
*/
@Slf4j
@Component
public class MessageConsume {
public class MessageConsumer {
@RabbitHandler
@RabbitListener(queuesToDeclare = @Queue(SimpleMQConstant.QUEUE_NAME), containerFactory = "defaultRabbitListenerContainerFactory")
public void receiveObject(SendMessageDTO simple) throws Exception {
TimeUnit.SECONDS.sleep(1);
ObjectMapper objectMapper = new ObjectMapper();
String message = objectMapper.writeValueAsString(simple);
log.info("consumer1 threadId {} Message: {}", Thread.currentThread().getId(),message);
}
@RabbitHandler
@RabbitListener(queuesToDeclare = @Queue(SimpleMQConstant.QUEUE_NAME))
public void receiveObject(SendMessageDTO simple) throws JsonProcessingException {
@RabbitListener(queuesToDeclare = @Queue(SimpleMQConstant.QUEUE_NAME), containerFactory = "defaultRabbitListenerContainerFactory")
public void receiveObject1(SendMessageDTO simple) throws Exception {
TimeUnit.SECONDS.sleep(1);
ObjectMapper objectMapper = new ObjectMapper();
String message = objectMapper.writeValueAsString(simple);
log.info("Message: {}", message);
log.info("consumer2 threadId {} Message: {}", Thread.currentThread().getId(),message);
}
}

@ -15,9 +15,10 @@
* limitations under the License.
*/
package cn.hippo4j.springboot.starter.adapter.rabbitmq.example;
package cn.hippo4j.springboot.starter.adapter.rabbitmq.example.producer;
import cn.hippo4j.example.core.dto.SendMessageDTO;
import cn.hippo4j.springboot.starter.adapter.rabbitmq.example.constants.SimpleMQConstant;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
@ -39,13 +40,15 @@ public class MessageProduce {
private final RabbitTemplate rabbitTemplate;
@GetMapping("/message/send")
public String sendMessage() {
String keys = UUID.randomUUID().toString();
SendMessageDTO payload = SendMessageDTO.builder()
.receiver("156011xxx91")
.uid(keys)
.build();
rabbitTemplate.convertAndSend(SimpleMQConstant.QUEUE_NAME, payload);
public String sendMessage(Integer count) {
for (int i = 0; i < count; i++) {
String keys = UUID.randomUUID().toString();
SendMessageDTO payload = SendMessageDTO.builder()
.receiver("156011xxx91")
.uid(keys)
.build();
rabbitTemplate.convertAndSend(SimpleMQConstant.QUEUE_NAME, payload);
}
return "success";
}
}
Loading…
Cancel
Save