配置文件加上注释

main
夜灬瞬 2 years ago
parent 40865b0bb1
commit a4dc0369ff

@ -8,14 +8,33 @@ import org.springframework.amqp.core.MessagePostProcessor;
import static com.tianji.common.constants.Constant.REQUEST_ID_HEADER; import static com.tianji.common.constants.Constant.REQUEST_ID_HEADER;
/**
* Java
* MessagePostProcessor
*
* ID
* @author
*/
public class BasicIdMessageProcessor implements MessagePostProcessor { public class BasicIdMessageProcessor implements MessagePostProcessor {
@Override @Override
public Message postProcessMessage(Message message) throws AmqpException { public Message postProcessMessage(Message message) throws AmqpException {
/*
使REQUEST_ID_HEADER(requestId)
MDCMapped Diagnostic ContextID
*/
String requestId = MDC.get(REQUEST_ID_HEADER); String requestId = MDC.get(REQUEST_ID_HEADER);
if (requestId == null) { if (requestId == null) {
/*
ID
使UUIDID
requestId
*/
requestId = UUID.randomUUID().toString(true); requestId = UUID.randomUUID().toString(true);
} }
// 写入RequestID标示 // 写入RequestID标示
/*
使setHeaderID使REQUEST_ID_HEADER
*/
message.getMessageProperties().setHeader(REQUEST_ID_HEADER, requestId); message.getMessageProperties().setHeader(REQUEST_ID_HEADER, requestId);
return message; return message;
} }

@ -5,14 +5,36 @@ import org.springframework.amqp.core.Message;
import java.time.Duration; import java.time.Duration;
/**
*
* @author yhs
*/
public class DelayedMessageProcessor extends BasicIdMessageProcessor { public class DelayedMessageProcessor extends BasicIdMessageProcessor {
private final long delay; private final long delay;
/**
* Duration
* delay
* @param delay
*/
public DelayedMessageProcessor(Duration delay) { public DelayedMessageProcessor(Duration delay) {
this.delay = delay.toMillis(); this.delay = delay.toMillis();
} }
/**
* postProcessMessage
*
* BasicIdMessageProcessorpostProcessMessage
* ID
*
*
* 使"x-delay"
*
* @param message
* @return
* @throws AmqpException
*/
@Override @Override
public Message postProcessMessage(Message message) throws AmqpException { public Message postProcessMessage(Message message) throws AmqpException {
// 1.添加消息id // 1.添加消息id

@ -29,6 +29,17 @@ import static com.tianji.common.constants.MqConstants.Key.ERROR_KEY_PREFIX;
import static com.tianji.common.constants.MqConstants.Queue.ERROR_QUEUE_TEMPLATE; import static com.tianji.common.constants.MqConstants.Queue.ERROR_QUEUE_TEMPLATE;
/**
* MqConfig RabbitMQ
* SpringEnvironmentAware
*
*
* <br/>
* {@code @ConditionalOnClass}
* MessageConverter AmqpTemplate
*
* @author
*/
@Configuration @Configuration
@ConditionalOnClass(value = {MessageConverter.class, AmqpTemplate.class}) @ConditionalOnClass(value = {MessageConverter.class, AmqpTemplate.class})
public class MqConfig implements EnvironmentAware{ public class MqConfig implements EnvironmentAware{
@ -36,15 +47,33 @@ public class MqConfig implements EnvironmentAware{
private String defaultErrorRoutingKey; private String defaultErrorRoutingKey;
private String defaultErrorQueue; private String defaultErrorQueue;
/**
* "rabbitListenerContainerFactory"bean
* beanRabbitMQ
* @param configurer
* @param connectionFactory
* @param simpleContainerCustomizer
* @return
*/
@Bean(name = "rabbitListenerContainerFactory") @Bean(name = "rabbitListenerContainerFactory")
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener", name = "type", havingValue = "simple", @ConditionalOnProperty(prefix = "spring.rabbitmq.listener", name = "type", havingValue = "simple",
matchIfMissing = true) matchIfMissing = true)
SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory( SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory, SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory,
ObjectProvider<ContainerCustomizer<SimpleMessageListenerContainer>> simpleContainerCustomizer) { ObjectProvider<ContainerCustomizer<SimpleMessageListenerContainer>> simpleContainerCustomizer) {
// 创建 SimpleRabbitListenerContainerFactory 对象
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
// 使用 configurer对象对 factory对象进行配置
configurer.configure(factory, connectionFactory); configurer.configure(factory, connectionFactory);
// 设置一个容器自定义器,用于对容器进行进一步的定制。
simpleContainerCustomizer.ifUnique(factory::setContainerCustomizer); simpleContainerCustomizer.ifUnique(factory::setContainerCustomizer);
/*
"REQUEST_ID_HEADER"MDC
便ID
*/
factory.setAfterReceivePostProcessors(message -> { factory.setAfterReceivePostProcessors(message -> {
Object header = message.getMessageProperties().getHeader(REQUEST_ID_HEADER); Object header = message.getMessageProperties().getHeader(REQUEST_ID_HEADER);
if(header != null) { if(header != null) {
@ -55,6 +84,11 @@ public class MqConfig implements EnvironmentAware{
return factory; return factory;
} }
/**
* RabbitMQJSONJava
* @param mapper JSON
* @return
*/
@Bean @Bean
public MessageConverter messageConverter(ObjectMapper mapper){ public MessageConverter messageConverter(ObjectMapper mapper){
return new Jackson2JsonMessageConverter(mapper); return new Jackson2JsonMessageConverter(mapper);
@ -92,11 +126,21 @@ public class MqConfig implements EnvironmentAware{
return new DirectExchange(ERROR_EXCHANGE); return new DirectExchange(ERROR_EXCHANGE);
} }
/**
*
* @return
*/
@Bean @Bean
public Queue errorQueue(){ public Queue errorQueue(){
return new Queue(defaultErrorQueue, true); return new Queue(defaultErrorQueue, true);
} }
/**
*
* @param errorQueue
* @param errorMessageExchange
* @return
*/
@Bean @Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){ public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with(defaultErrorRoutingKey); return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with(defaultErrorRoutingKey);
@ -105,7 +149,9 @@ public class MqConfig implements EnvironmentAware{
@Override @Override
public void setEnvironment(Environment environment) { public void setEnvironment(Environment environment) {
String appName = environment.getProperty("spring.application.name"); String appName = environment.getProperty("spring.application.name");
// 设置错误队列的路由键 error. applicationName
this.defaultErrorRoutingKey = ERROR_KEY_PREFIX + appName; this.defaultErrorRoutingKey = ERROR_KEY_PREFIX + appName;
// 设置错误队列的队列名 error. applicationName .queue
this.defaultErrorQueue = StringUtils.format(ERROR_QUEUE_TEMPLATE, appName); this.defaultErrorQueue = StringUtils.format(ERROR_QUEUE_TEMPLATE, appName);
} }
} }

@ -14,6 +14,9 @@ import java.util.concurrent.ThreadPoolExecutor;
import static com.tianji.common.constants.Constant.REQUEST_ID_HEADER; import static com.tianji.common.constants.Constant.REQUEST_ID_HEADER;
/**
* @author
*/
@Slf4j @Slf4j
public class RabbitMqHelper { public class RabbitMqHelper {
@ -21,6 +24,14 @@ public class RabbitMqHelper {
private final MessagePostProcessor processor = new BasicIdMessageProcessor(); private final MessagePostProcessor processor = new BasicIdMessageProcessor();
private final ThreadPoolTaskExecutor executor; private final ThreadPoolTaskExecutor executor;
/**
* RabbitMQ
*
* ThreadPoolTaskExecutor
*
* RabbitMQ
* @param rabbitTemplate rabbitTemplate
*/
public RabbitMqHelper(RabbitTemplate rabbitTemplate) { public RabbitMqHelper(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate; this.rabbitTemplate = rabbitTemplate;
executor = new ThreadPoolTaskExecutor(); executor = new ThreadPoolTaskExecutor();
@ -47,6 +58,7 @@ public class RabbitMqHelper {
log.debug("准备发送消息exchange{} RoutingKey{} message{}", exchange, routingKey,t); log.debug("准备发送消息exchange{} RoutingKey{} message{}", exchange, routingKey,t);
// 1.设置消息标示,用于消息确认,消息发送失败直接抛出异常,交给调用者处理 // 1.设置消息标示,用于消息确认,消息发送失败直接抛出异常,交给调用者处理
String id = UUID.randomUUID().toString(true); String id = UUID.randomUUID().toString(true);
// 使用UUID生成一个唯一的消息标识符并将其设置为CorrelationData对象
CorrelationData correlationData = new CorrelationData(id); CorrelationData correlationData = new CorrelationData(id);
// 2.设置发送超时时间为500毫秒 // 2.设置发送超时时间为500毫秒
rabbitTemplate.setReplyTimeout(500); rabbitTemplate.setReplyTimeout(500);
@ -60,6 +72,7 @@ public class RabbitMqHelper {
public <T> void sendDelayMessage(String exchange, String routingKey, T t, Duration delay) { public <T> void sendDelayMessage(String exchange, String routingKey, T t, Duration delay) {
// 1.设置消息标示,用于消息确认,消息发送失败直接抛出异常,交给调用者处理 // 1.设置消息标示,用于消息确认,消息发送失败直接抛出异常,交给调用者处理
String id = UUID.randomUUID().toString(true); String id = UUID.randomUUID().toString(true);
// 使用UUID生成一个唯一的消息标识符并将其设置为CorrelationData对象
CorrelationData correlationData = new CorrelationData(id); CorrelationData correlationData = new CorrelationData(id);
// 2.设置发送超时时间为500毫秒 // 2.设置发送超时时间为500毫秒
rabbitTemplate.setReplyTimeout(500); rabbitTemplate.setReplyTimeout(500);

Loading…
Cancel
Save