From 86e6d4df2ce5e025cb5ab1d2b929d44e01670797 Mon Sep 17 00:00:00 2001 From: heqijun Date: Thu, 12 Jun 2025 23:12:55 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9E=84=E5=BB=BA=E6=8E=A8=E9=80=81=E6=A8=A1?= =?UTF-8?q?=E5=9D=97beacon-push=20+=20=E5=AE=9E=E7=8E=B0=E6=8E=A8=E9=80=81?= =?UTF-8?q?=E7=8A=B6=E6=80=81=E6=8A=A5=E5=91=8A=20+=20=E5=AE=9E=E7=8E=B0?= =?UTF-8?q?=E5=BB=B6=E8=BF=9F=E9=98=9F=E5=88=97=E6=8E=A8=E9=80=81=E4=BA=94?= =?UTF-8?q?=E6=AC=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../mashibing/common/pojo/StandardReport.java | 8 ++ .../com/mashibing/common/utils/JsonUtil.java | 130 ++++++++++++++++- beacon-push/pom.xml | 58 ++++++++ .../com/mashibing/push/PushApplication.java | 22 +++ .../mashibing/push/config/RabbitMQConfig.java | 46 ++++++ .../push/config/RestTemplateConfig.java | 23 +++ .../push/feignclient/Cacheclient.java | 15 ++ .../mashibing/push/mq/PushReportListener.java | 136 ++++++++++++++++++ beacon-push/src/main/resources/bootstrap.yml | 17 +++ .../search/mq/SmsWriteLogListener.java | 2 +- pom.xml | 1 + 11 files changed, 453 insertions(+), 5 deletions(-) create mode 100644 beacon-push/pom.xml create mode 100644 beacon-push/src/main/java/com/mashibing/push/PushApplication.java create mode 100644 beacon-push/src/main/java/com/mashibing/push/config/RabbitMQConfig.java create mode 100644 beacon-push/src/main/java/com/mashibing/push/config/RestTemplateConfig.java create mode 100644 beacon-push/src/main/java/com/mashibing/push/feignclient/Cacheclient.java create mode 100644 beacon-push/src/main/java/com/mashibing/push/mq/PushReportListener.java create mode 100644 beacon-push/src/main/resources/bootstrap.yml diff --git a/beacon-common/src/main/java/com/mashibing/common/pojo/StandardReport.java b/beacon-common/src/main/java/com/mashibing/common/pojo/StandardReport.java index b87159c..4afbc1f 100644 --- a/beacon-common/src/main/java/com/mashibing/common/pojo/StandardReport.java +++ b/beacon-common/src/main/java/com/mashibing/common/pojo/StandardReport.java @@ -1,5 +1,9 @@ package com.mashibing.common.pojo; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer; +import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer; import com.mashibing.common.annotation.Description; import lombok.AllArgsConstructor; import lombok.Data; @@ -32,6 +36,8 @@ public class StandardReport implements Serializable { @Description("目标手机号,客户请求传递的") private String mobile; + @JsonSerialize(using = LocalDateTimeSerializer.class) + @JsonDeserialize(using = LocalDateTimeDeserializer.class) @Description("短信的发送时间,当前系统时间") private LocalDateTime sendTime; @@ -47,5 +53,7 @@ public class StandardReport implements Serializable { @Description("回调url") private String callbackUrl; + @Description("推送报告推送次数") + private Integer pushTimes = 0; } diff --git a/beacon-common/src/main/java/com/mashibing/common/utils/JsonUtil.java b/beacon-common/src/main/java/com/mashibing/common/utils/JsonUtil.java index cf5301c..26881a7 100644 --- a/beacon-common/src/main/java/com/mashibing/common/utils/JsonUtil.java +++ b/beacon-common/src/main/java/com/mashibing/common/utils/JsonUtil.java @@ -2,23 +2,145 @@ package com.mashibing.common.utils; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.SerializationFeature; + +import java.util.Map; /** * @author heqijun * @ClassName: JsonUtil - * @Description: TODO(这里用一句话描述这个类的作用) + * @Description: JsonUtil工具类 * @date 2025/6/12 18:46 */ public class JsonUtil { - private static ObjectMapper objectMapper = new ObjectMapper(); + private static final ObjectMapper objectMapper = new ObjectMapper(); + + static { + // 配置ObjectMapper + objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false); + objectMapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false); + } - public static String obj2Json(Object obj) { + /** + * 将对象转换为JSON字符串 + * + * @param obj 任意Java对象 + * @return JSON字符串 + */ + public static String objToJson(Object obj) { try { return objectMapper.writeValueAsString(obj); } catch (JsonProcessingException e) { - throw new RuntimeException("转换JSON失败!!!"); + throw new RuntimeException("对象转JSON失败", e); + } + } + + /** + * 将JSON字符串转换为指定类型的对象 + * + * @param json JSON字符串 + * @param clazz 目标对象类型 + * @return 转换后的对象 + */ + public static T jsonToObj(String json, Class clazz) { + try { + return objectMapper.readValue(json, clazz); + } catch (JsonProcessingException e) { + throw new RuntimeException("JSON转对象失败", e); + } + } + + /** + * 将JSON字符串转换为Map + * + * @param json JSON字符串 + * @return Map对象 + */ + public static Map jsonToMap(String json) { + try { + return objectMapper.readValue(json, new TypeReference>() {}); + } catch (JsonProcessingException e) { + throw new RuntimeException("JSON转Map失败", e); + } + } + + /** + * 将Map转换为JSON字符串 + * + * @param map Map对象 + * @return JSON字符串 + */ + public static String mapToJson(Map map) { + try { + return objectMapper.writeValueAsString(map); + } catch (JsonProcessingException e) { + throw new RuntimeException("Map转JSON失败", e); + } + } + + /** + * 将Map转换为指定类型的对象 + * + * @param map Map对象 + * @param clazz 目标对象类型 + * @return 转换后的对象 + */ + public static T mapToObj(Map map, Class clazz) { + try { + String json = mapToJson(map); + return jsonToObj(json, clazz); + } catch (Exception e) { + throw new RuntimeException("Map转对象失败", e); + } + } + + /** + * 将对象转换为Map + * + * @param obj 任意Java对象 + * @return Map对象 + */ + public static Map objToMap(Object obj) { + try { + String json = objToJson(obj); + return jsonToMap(json); + } catch (Exception e) { + throw new RuntimeException("对象转Map失败", e); + } + } + + /** + * 通用转换方法,支持任意类型转换 + * + * @param source 源数据 + * @param targetType 目标类型 + * @return 转换后的结果 + */ + @SuppressWarnings("unchecked") + public static T convert(Object source, Class targetType) { + if (source == null) { + return null; + } + + if (targetType.isAssignableFrom(source.getClass())) { + return (T) source; + } + + if (source instanceof String) { + // JSON字符串转对象 + return jsonToObj((String) source, targetType); + } else if (source instanceof Map) { + // Map转对象 + return mapToObj((Map) source, targetType); + } else { + // 对象转JSON再转目标类型 + String json = objToJson(source); + return jsonToObj(json, targetType); } } } diff --git a/beacon-push/pom.xml b/beacon-push/pom.xml new file mode 100644 index 0000000..7ce416e --- /dev/null +++ b/beacon-push/pom.xml @@ -0,0 +1,58 @@ + + + 4.0.0 + + com.mashibing + beacon-cloud + 1.0-SNAPSHOT + + + beacon-push + + + + + org.springframework.boot + spring-boot-starter-web + + + + com.alibaba.cloud + spring-cloud-starter-alibaba-nacos-discovery + + + + com.alibaba.cloud + spring-cloud-starter-alibaba-nacos-config + + + org.springframework.cloud + spring-cloud-starter-openfeign + + + + org.springframework.boot + spring-boot-starter-amqp + + + + org.springframework.boot + spring-boot-starter-test + + + + org.projectlombok + lombok + + + + + com.mashibing + beacon-common + 1.0-SNAPSHOT + + + + \ No newline at end of file diff --git a/beacon-push/src/main/java/com/mashibing/push/PushApplication.java b/beacon-push/src/main/java/com/mashibing/push/PushApplication.java new file mode 100644 index 0000000..82d4d31 --- /dev/null +++ b/beacon-push/src/main/java/com/mashibing/push/PushApplication.java @@ -0,0 +1,22 @@ +package com.mashibing.push; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.cloud.client.discovery.EnableDiscoveryClient; +import org.springframework.cloud.openfeign.EnableFeignClients; + +/** + * @author heqijun + * @ClassName: PushApplication + * @Description: 推送模块启动类 + * @date 2025/6/12 19:41 + */ + +@SpringBootApplication +@EnableFeignClients +@EnableDiscoveryClient +public class PushApplication { + public static void main(String[] args) { + SpringApplication.run(PushApplication.class, args); + } +} diff --git a/beacon-push/src/main/java/com/mashibing/push/config/RabbitMQConfig.java b/beacon-push/src/main/java/com/mashibing/push/config/RabbitMQConfig.java new file mode 100644 index 0000000..37476ed --- /dev/null +++ b/beacon-push/src/main/java/com/mashibing/push/config/RabbitMQConfig.java @@ -0,0 +1,46 @@ +package com.mashibing.push.config; + +import org.springframework.amqp.core.*; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.HashMap; +import java.util.Map; + +/** + * @author heqijun + * @ClassName: RabbitMQConfig + * @Description: 声明延迟交换机并绑定队列 + * @date 2025/6/12 21:35 + */ + +@Configuration +public class RabbitMQConfig { + + public static final String DELAYED_EXCHANGE = "push_delayed_exchange"; + + public static final String DELAYED_QUEUE = "push_delayed_queue"; + + private static final String DELAYED_EXCHANGE_TYPE = "x-delayed-message"; + + private static final String DELAYED_ROUTING_TYPE_KEY = "x-delayed-type"; + + private static final String DELAYED_ROUTING_TYPE_FANOUT = "fanout"; + + @Bean + public Exchange delayedExchange() { + Map args = new HashMap(); + args.put(DELAYED_ROUTING_TYPE_KEY, DELAYED_ROUTING_TYPE_FANOUT); + return new CustomExchange(DELAYED_EXCHANGE, DELAYED_EXCHANGE_TYPE, true, false, args); + } + + @Bean + public Queue delayedQueue() { + return QueueBuilder.durable(DELAYED_QUEUE).build(); + } + + @Bean + public Binding delayedExchangeBinding(Exchange delayedExchange, Queue delayedQueue) { + return BindingBuilder.bind(delayedQueue).to(delayedExchange).with("").noargs(); + } +} diff --git a/beacon-push/src/main/java/com/mashibing/push/config/RestTemplateConfig.java b/beacon-push/src/main/java/com/mashibing/push/config/RestTemplateConfig.java new file mode 100644 index 0000000..e502a49 --- /dev/null +++ b/beacon-push/src/main/java/com/mashibing/push/config/RestTemplateConfig.java @@ -0,0 +1,23 @@ +package com.mashibing.push.config; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.Bean; +import org.springframework.stereotype.Component; +import org.springframework.web.client.RestTemplate; + +/** + * @author heqijun + * @ClassName: RestTemplateConfig + * @Description: RestTemplateConfig配置类 + * @date 2025/6/12 19:54 + */ + +@Slf4j +@Component +public class RestTemplateConfig { + + @Bean + public RestTemplate restTemplate() { + return new RestTemplate(); + } +} diff --git a/beacon-push/src/main/java/com/mashibing/push/feignclient/Cacheclient.java b/beacon-push/src/main/java/com/mashibing/push/feignclient/Cacheclient.java new file mode 100644 index 0000000..cc98ecf --- /dev/null +++ b/beacon-push/src/main/java/com/mashibing/push/feignclient/Cacheclient.java @@ -0,0 +1,15 @@ +package com.mashibing.push.feignclient; + +import com.mashibing.common.clients.BeaconCacheClient; +import org.springframework.cloud.openfeign.FeignClient; + +/** + * @author heqijun + * @ClassName: Cacheclient + * @Description: TODO(这里用一句话描述这个类的作用) + * @date 2025/6/12 19:55 + */ + +@FeignClient("beacon-cache") +public interface Cacheclient extends BeaconCacheClient { +} diff --git a/beacon-push/src/main/java/com/mashibing/push/mq/PushReportListener.java b/beacon-push/src/main/java/com/mashibing/push/mq/PushReportListener.java new file mode 100644 index 0000000..772c3ac --- /dev/null +++ b/beacon-push/src/main/java/com/mashibing/push/mq/PushReportListener.java @@ -0,0 +1,136 @@ +package com.mashibing.push.mq; + +import com.mashibing.common.constant.RabbitMQConstant; +import com.mashibing.common.pojo.StandardReport; +import com.mashibing.common.utils.JsonUtil; +import com.mashibing.push.config.RabbitMQConfig; +import com.mashibing.push.feignclient.Cacheclient; +import com.rabbitmq.client.Channel; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang.StringUtils; +import org.springframework.amqp.AmqpException; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.core.MessagePostProcessor; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpHeaders; +import org.springframework.http.MediaType; +import org.springframework.stereotype.Component; +import org.springframework.web.client.RestClientException; +import org.springframework.web.client.RestTemplate; + +import java.io.IOException; +import java.time.LocalDateTime; + +/** + * @author heqijun + * @ClassName: PushReportListener + * @Description: 监听推送短信状态报告队列 + * @date 2025/6/12 19:49 + */ + +@Slf4j +@Component +public class PushReportListener { + + @Autowired + RestTemplate restTemplate; + + @Autowired + Cacheclient cacheclient; + + @Autowired + RabbitTemplate rabbitTemplate; + + private static final String SUCCESS = "SUCCESS"; + + /** + * 重试时间间隔 + */ + private static final int[] RETRY_INTERVAL_SECONDS = {0, 15000, 30000, 60000, 300000}; + + @RabbitListener(queues = {RabbitMQConstant.SMS_PUSH_REPORT}) + public void consume(StandardReport report, Channel channel, Message message) throws IOException { + log.info("【推送模块】接收到状态报告消息: {}", report); + String callbackUrl = report.getCallbackUrl(); + + // 需要推送报告 + if (report.getIsCallback() == 1L) { + if (StringUtils.isBlank(callbackUrl)) { + log.error("【推送模块】客户推送地址为空,无法推送!!!callbackUrl={}", callbackUrl); + channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); + return; + } + // 推送报告 + retry(report); + } else { + // 不需要推送报告 + log.info("【推送模块】客户不需要推送报告!!!"); + channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); + return; + } + ack(channel, message); + } + + @RabbitListener(queues = {RabbitMQConfig.DELAYED_QUEUE}) + public void delayConsume(StandardReport report, Channel channel, Message message) throws IOException { + + retry(report); + //手动ack + ack(channel, message); + } + + private static void ack(Channel channel, Message message) throws IOException { + channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); + log.info("【推送模块】消费消息完毕,手动ack!!!"); + } + + private boolean pushReport(StandardReport report) { + log.info("【推送模块】推送时间:{}", LocalDateTime.now()); + boolean flag = false; + // 声明发送参数 + String body = JsonUtil.objToJson(report); + + // 声明restTemplate模板代码 + HttpHeaders headers = new HttpHeaders(); + headers.setContentType(MediaType.APPLICATION_JSON); + HttpEntity httpEntity = new HttpEntity<>(body, headers); + + //发请求 + try { + report.setPushTimes(report.getPushTimes() + 1); + String response = restTemplate.postForObject("http://" + report.getCallbackUrl(), httpEntity, String.class); + flag = SUCCESS.equals(response); + } catch (RestClientException e) { + log.error("【推送模块】推送消息到客户异常:errorMessage{}", e.getMessage()); + // e.printStackTrace(); + } + return flag; + } + + private void retry(StandardReport report) { + + // 推送报告 + log.info("【推送模块】第{}次推送状态报告开始!!!report={}", report.getPushTimes() + 1, report); + boolean flag = pushReport(report); + if (!flag) { + // 重试 + log.info("【推送模块】第{}次推送消息失败!!!report={}", report.getPushTimes(), report); + if (report.getPushTimes() >= 5) { + return; + } + // 放到延迟队列中 + rabbitTemplate.convertAndSend(RabbitMQConfig.DELAYED_EXCHANGE, "", report, message1 -> { + //设置延迟时间 + message1.getMessageProperties().setDelay(RETRY_INTERVAL_SECONDS[report.getPushTimes()]); + return message1; + }); + } else { + log.info("【推送模块】第{}次推送消息成功!!!report={}", report.getPushTimes() + 1, report); + } + } + + +} diff --git a/beacon-push/src/main/resources/bootstrap.yml b/beacon-push/src/main/resources/bootstrap.yml new file mode 100644 index 0000000..1429329 --- /dev/null +++ b/beacon-push/src/main/resources/bootstrap.yml @@ -0,0 +1,17 @@ +# 服务名称 +spring: + application: + name: beacon-push + # 多环境 + profiles: + active: dev + # nacos注册中心地址 + cloud: + nacos: + discovery: + server-addr: 192.168.1.13:8848 + # nacos配置中心地址: + config: + server-addr: 192.168.1.13:8848 + file-extension: yml + # beacon-push-dev.yml \ No newline at end of file diff --git a/beacon-search/src/main/java/com/mashibing/search/mq/SmsWriteLogListener.java b/beacon-search/src/main/java/com/mashibing/search/mq/SmsWriteLogListener.java index 2758395..079a902 100644 --- a/beacon-search/src/main/java/com/mashibing/search/mq/SmsWriteLogListener.java +++ b/beacon-search/src/main/java/com/mashibing/search/mq/SmsWriteLogListener.java @@ -36,7 +36,7 @@ public class SmsWriteLogListener { log.info("接收到存储日志的信息,submit={}", submit); String sequenceId = submit.getSequenceId().toString(); - String json = JsonUtil.obj2Json(submit); + String json = JsonUtil.objToJson(submit); String year = LocalDateTime.now().getYear() + ""; searchService.index(INDEX + year, sequenceId, json); diff --git a/pom.xml b/pom.xml index 0d4fad5..81e7138 100644 --- a/pom.xml +++ b/pom.xml @@ -15,6 +15,7 @@ beacon-test beacon-strategy beacon-search + beacon-push