|
|
|
@ -0,0 +1,136 @@
|
|
|
|
|
package com.mashibing.push.mq;
|
|
|
|
|
|
|
|
|
|
import com.mashibing.common.constant.RabbitMQConstant;
|
|
|
|
|
import com.mashibing.common.pojo.StandardReport;
|
|
|
|
|
import com.mashibing.common.utils.JsonUtil;
|
|
|
|
|
import com.mashibing.push.config.RabbitMQConfig;
|
|
|
|
|
import com.mashibing.push.feignclient.Cacheclient;
|
|
|
|
|
import com.rabbitmq.client.Channel;
|
|
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
|
|
import org.apache.commons.lang.StringUtils;
|
|
|
|
|
import org.springframework.amqp.AmqpException;
|
|
|
|
|
import org.springframework.amqp.core.Message;
|
|
|
|
|
import org.springframework.amqp.core.MessagePostProcessor;
|
|
|
|
|
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
|
|
|
|
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
|
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
|
|
import org.springframework.http.HttpEntity;
|
|
|
|
|
import org.springframework.http.HttpHeaders;
|
|
|
|
|
import org.springframework.http.MediaType;
|
|
|
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
import org.springframework.web.client.RestClientException;
|
|
|
|
|
import org.springframework.web.client.RestTemplate;
|
|
|
|
|
|
|
|
|
|
import java.io.IOException;
|
|
|
|
|
import java.time.LocalDateTime;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* @author heqijun
|
|
|
|
|
* @ClassName: PushReportListener
|
|
|
|
|
* @Description: 监听推送短信状态报告队列
|
|
|
|
|
* @date 2025/6/12 19:49
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
@Slf4j
|
|
|
|
|
@Component
|
|
|
|
|
public class PushReportListener {
|
|
|
|
|
|
|
|
|
|
@Autowired
|
|
|
|
|
RestTemplate restTemplate;
|
|
|
|
|
|
|
|
|
|
@Autowired
|
|
|
|
|
Cacheclient cacheclient;
|
|
|
|
|
|
|
|
|
|
@Autowired
|
|
|
|
|
RabbitTemplate rabbitTemplate;
|
|
|
|
|
|
|
|
|
|
private static final String SUCCESS = "SUCCESS";
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 重试时间间隔
|
|
|
|
|
*/
|
|
|
|
|
private static final int[] RETRY_INTERVAL_SECONDS = {0, 15000, 30000, 60000, 300000};
|
|
|
|
|
|
|
|
|
|
@RabbitListener(queues = {RabbitMQConstant.SMS_PUSH_REPORT})
|
|
|
|
|
public void consume(StandardReport report, Channel channel, Message message) throws IOException {
|
|
|
|
|
log.info("【推送模块】接收到状态报告消息: {}", report);
|
|
|
|
|
String callbackUrl = report.getCallbackUrl();
|
|
|
|
|
|
|
|
|
|
// 需要推送报告
|
|
|
|
|
if (report.getIsCallback() == 1L) {
|
|
|
|
|
if (StringUtils.isBlank(callbackUrl)) {
|
|
|
|
|
log.error("【推送模块】客户推送地址为空,无法推送!!!callbackUrl={}", callbackUrl);
|
|
|
|
|
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
// 推送报告
|
|
|
|
|
retry(report);
|
|
|
|
|
} else {
|
|
|
|
|
// 不需要推送报告
|
|
|
|
|
log.info("【推送模块】客户不需要推送报告!!!");
|
|
|
|
|
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
ack(channel, message);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@RabbitListener(queues = {RabbitMQConfig.DELAYED_QUEUE})
|
|
|
|
|
public void delayConsume(StandardReport report, Channel channel, Message message) throws IOException {
|
|
|
|
|
|
|
|
|
|
retry(report);
|
|
|
|
|
//手动ack
|
|
|
|
|
ack(channel, message);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private static void ack(Channel channel, Message message) throws IOException {
|
|
|
|
|
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
|
|
|
|
|
log.info("【推送模块】消费消息完毕,手动ack!!!");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private boolean pushReport(StandardReport report) {
|
|
|
|
|
log.info("【推送模块】推送时间:{}", LocalDateTime.now());
|
|
|
|
|
boolean flag = false;
|
|
|
|
|
// 声明发送参数
|
|
|
|
|
String body = JsonUtil.objToJson(report);
|
|
|
|
|
|
|
|
|
|
// 声明restTemplate模板代码
|
|
|
|
|
HttpHeaders headers = new HttpHeaders();
|
|
|
|
|
headers.setContentType(MediaType.APPLICATION_JSON);
|
|
|
|
|
HttpEntity<Object> httpEntity = new HttpEntity<>(body, headers);
|
|
|
|
|
|
|
|
|
|
//发请求
|
|
|
|
|
try {
|
|
|
|
|
report.setPushTimes(report.getPushTimes() + 1);
|
|
|
|
|
String response = restTemplate.postForObject("http://" + report.getCallbackUrl(), httpEntity, String.class);
|
|
|
|
|
flag = SUCCESS.equals(response);
|
|
|
|
|
} catch (RestClientException e) {
|
|
|
|
|
log.error("【推送模块】推送消息到客户异常:errorMessage{}", e.getMessage());
|
|
|
|
|
// e.printStackTrace();
|
|
|
|
|
}
|
|
|
|
|
return flag;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void retry(StandardReport report) {
|
|
|
|
|
|
|
|
|
|
// 推送报告
|
|
|
|
|
log.info("【推送模块】第{}次推送状态报告开始!!!report={}", report.getPushTimes() + 1, report);
|
|
|
|
|
boolean flag = pushReport(report);
|
|
|
|
|
if (!flag) {
|
|
|
|
|
// 重试
|
|
|
|
|
log.info("【推送模块】第{}次推送消息失败!!!report={}", report.getPushTimes(), report);
|
|
|
|
|
if (report.getPushTimes() >= 5) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
// 放到延迟队列中
|
|
|
|
|
rabbitTemplate.convertAndSend(RabbitMQConfig.DELAYED_EXCHANGE, "", report, message1 -> {
|
|
|
|
|
//设置延迟时间
|
|
|
|
|
message1.getMessageProperties().setDelay(RETRY_INTERVAL_SECONDS[report.getPushTimes()]);
|
|
|
|
|
return message1;
|
|
|
|
|
});
|
|
|
|
|
} else {
|
|
|
|
|
log.info("【推送模块】第{}次推送消息成功!!!report={}", report.getPushTimes() + 1, report);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
}
|