diff --git a/beacon-common/src/main/java/com/mashibing/common/constant/RabbitMQConstant.java b/beacon-common/src/main/java/com/mashibing/common/constant/RabbitMQConstant.java index 9215f58..de55856 100644 --- a/beacon-common/src/main/java/com/mashibing/common/constant/RabbitMQConstant.java +++ b/beacon-common/src/main/java/com/mashibing/common/constant/RabbitMQConstant.java @@ -32,4 +32,12 @@ public interface RabbitMQConstant { * */ String SMS_GATEWAY = "sms_gateway_topic_"; + + /** + * 短信网关模块涉及到的私信队列需要的信息 + */ + String SMS_GATEWAY_NORMAL_EXCHANGE = "sms_gateway_normal_exchange"; + String SMS_GATEWAY_NORMAL_QUEUE = "sms_gateway_normal_queue"; + String SMS_GATEWAY_DEAD_EXCHANGE = "sms_gateway_dead_exchange"; + String SMS_GATEWAY_DEAD_QUEUE = "sms_gateway_dead_queue"; } diff --git a/beacon-common/src/main/java/com/mashibing/common/constant/SearchConstant.java b/beacon-common/src/main/java/com/mashibing/common/constant/SearchConstant.java new file mode 100644 index 0000000..545579c --- /dev/null +++ b/beacon-common/src/main/java/com/mashibing/common/constant/SearchConstant.java @@ -0,0 +1,16 @@ +package com.mashibing.common.constant; + +/** + * @author heqijun + * @ClassName: SearchConstant + * @Description: 搜索模块常量 + * @date 2025/6/18 20:30 + */ + +public interface SearchConstant { + + /** + * 短信日志索引前缀 + */ + String INDEX_PREFIX = "sms_submit_log_"; +} diff --git a/beacon-common/src/main/java/com/mashibing/common/enums/ExceptionEnums.java b/beacon-common/src/main/java/com/mashibing/common/enums/ExceptionEnums.java index 809105f..1205cad 100644 --- a/beacon-common/src/main/java/com/mashibing/common/enums/ExceptionEnums.java +++ b/beacon-common/src/main/java/com/mashibing/common/enums/ExceptionEnums.java @@ -27,7 +27,9 @@ public enum ExceptionEnums { LIMIT_MINUTE(-16, "达到分钟限流阈值!!"), LIMIT_HOUR(-17, "达到小时限流阈值!!"), NO_CHANNEL(-18, "没有可用通道!!!"), - SEARCH_INDEX_ERROR(-20, "es添加一行文档失败!!!"); + SEARCH_INDEX_ERROR(-20, "es添加一行文档失败!!!"), + SEARCH_UPDATE_ERROR(-21, "es更新文档失败!!!"), + ; private final int code; diff --git a/beacon-common/src/main/java/com/mashibing/common/pojo/StandardReport.java b/beacon-common/src/main/java/com/mashibing/common/pojo/StandardReport.java index 4afbc1f..dada0f5 100644 --- a/beacon-common/src/main/java/com/mashibing/common/pojo/StandardReport.java +++ b/beacon-common/src/main/java/com/mashibing/common/pojo/StandardReport.java @@ -56,4 +56,10 @@ public class StandardReport implements Serializable { @Description("推送报告推送次数") private Integer pushTimes = 0; + @Description("apikey") + private String apikey; + + @Description("日志更新次数") + private Integer updateTimes = 0; + } diff --git a/beacon-common/src/main/java/com/mashibing/common/utils/CMPPDeliverMapUtil.java b/beacon-common/src/main/java/com/mashibing/common/utils/CMPPDeliverMapUtil.java index 7dd9061..abe789f 100644 --- a/beacon-common/src/main/java/com/mashibing/common/utils/CMPPDeliverMapUtil.java +++ b/beacon-common/src/main/java/com/mashibing/common/utils/CMPPDeliverMapUtil.java @@ -15,8 +15,8 @@ public class CMPPDeliverMapUtil { private static ConcurrentHashMap tempReport = new ConcurrentHashMap<>(); - public static void put(String msgId, StandardReport submit) { - tempReport.put(msgId, submit); + public static void put(String msgId, StandardReport report) { + tempReport.put(msgId, report); } public static StandardReport get(String msgId) { diff --git a/beacon-search/src/main/java/com/mashibing/search/mq/SmsUpdateLogListener.java b/beacon-search/src/main/java/com/mashibing/search/mq/SmsUpdateLogListener.java new file mode 100644 index 0000000..f1723d1 --- /dev/null +++ b/beacon-search/src/main/java/com/mashibing/search/mq/SmsUpdateLogListener.java @@ -0,0 +1,67 @@ +package com.mashibing.search.mq; + +import com.mashibing.common.constant.RabbitMQConstant; +import com.mashibing.common.constant.SearchConstant; +import com.mashibing.common.pojo.StandardReport; +import com.mashibing.search.service.SearchService; +import com.rabbitmq.client.Channel; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.time.LocalDateTime; +import java.util.HashMap; +import java.util.Map; + +/** + * @author heqijun + * @ClassName: SmsUpdateLogListener + * @Description: 搜索模块监听更新日志队列 + * @date 2025/6/18 18:48 + */ + +@Slf4j +@Component +public class SmsUpdateLogListener { + + @Autowired + SearchService searchService; + + @Autowired + RabbitTemplate rabbitTemplate; + + private static final int UPDATE_TIMES_MAX = 3; + + @RabbitListener(queues = {RabbitMQConstant.SMS_GATEWAY_DEAD_QUEUE}) + public void consume(StandardReport report, Channel channel, Message message) throws IOException { + log.info("【搜索模块-更新日志】接收到更新日志的信息,report={}", report); + // 投递次数+1 + report.setUpdateTimes(report.getUpdateTimes() + 1); + String sequenceId = report.getSequenceId().toString(); + int year = LocalDateTime.now().getYear(); + boolean exist = searchService.exist(SearchConstant.INDEX_PREFIX + year, sequenceId); + if (exist) { + // 存在,更新日志 + Map map = new HashMap<>(); + map.put("reportState", report.getReportState()); + searchService.update(SearchConstant.INDEX_PREFIX + year, sequenceId, map); +// log.info("【搜索模块-更新日志】更新成功!!!更新字段:{},report={}", map, report); + } else { + // 不存在 + if (report.getUpdateTimes() >= UPDATE_TIMES_MAX) { + // 达到最大投递次数,不扔回队列,而是直接记录日志 + log.error("【搜索模块-更新日志】已达到最大投递次数,更新失败report={}", report); + } else { + // 将消息扔回队列中 + log.info("【搜索模块-更新日志】ES中不存在当前日志,投递次数:{},report={}", report.getUpdateTimes(), report); + rabbitTemplate.convertAndSend(RabbitMQConstant.SMS_GATEWAY_NORMAL_EXCHANGE,"", report); + } + } + //手动ack + channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); + } +} diff --git a/beacon-search/src/main/java/com/mashibing/search/mq/SmsWriteLogListener.java b/beacon-search/src/main/java/com/mashibing/search/mq/SmsWriteLogListener.java index 079a902..b732187 100644 --- a/beacon-search/src/main/java/com/mashibing/search/mq/SmsWriteLogListener.java +++ b/beacon-search/src/main/java/com/mashibing/search/mq/SmsWriteLogListener.java @@ -1,6 +1,7 @@ package com.mashibing.search.mq; import com.mashibing.common.constant.RabbitMQConstant; +import com.mashibing.common.constant.SearchConstant; import com.mashibing.common.pojo.StandardSubmit; import com.mashibing.common.utils.JsonUtil; import com.mashibing.search.service.SearchService; @@ -28,17 +29,16 @@ public class SmsWriteLogListener { @Autowired SearchService searchService; - private final String INDEX = "sms_submit_log_"; @RabbitListener(queues = {RabbitMQConstant.SMS_WRITE_LOG}) public void process(StandardSubmit submit, Channel channel, Message message) throws IOException { - log.info("接收到存储日志的信息,submit={}", submit); + log.info("【搜索模块-写入日志】接收到写入日志的信息,submit={}", submit); String sequenceId = submit.getSequenceId().toString(); String json = JsonUtil.objToJson(submit); String year = LocalDateTime.now().getYear() + ""; - searchService.index(INDEX + year, sequenceId, json); + searchService.index(SearchConstant.INDEX_PREFIX + year, sequenceId, json); //手动ack diff --git a/beacon-search/src/main/java/com/mashibing/search/service/SearchService.java b/beacon-search/src/main/java/com/mashibing/search/service/SearchService.java index 8658a11..a269a61 100644 --- a/beacon-search/src/main/java/com/mashibing/search/service/SearchService.java +++ b/beacon-search/src/main/java/com/mashibing/search/service/SearchService.java @@ -1,6 +1,7 @@ package com.mashibing.search.service; import java.io.IOException; +import java.util.Map; /** * @author heqijun @@ -14,9 +15,28 @@ public interface SearchService { /** * 向es中添加一行文档 * - * @param index 索引信息 + * @param index 索引名称 * @param id 文档id - * @param json 文档内容 + * @param json 文档内容json */ void index(String index, String id, String json) throws IOException; + + /** + * 查询指定文档是否存在 + * + * @param index 索引名称 + * @param id 文档id + * @return true存在/false不存在 + * @throws IOException + */ + boolean exist(String index, String id) throws IOException; + + /** + * @param index 索引名称 + * @param id 文档id + * @param doc 要修改文档内容 + * @throws IOException + */ + void update(String index, String id, Map doc) throws IOException; + } diff --git a/beacon-search/src/main/java/com/mashibing/search/service/impl/ElasticSearchServiceImpl.java b/beacon-search/src/main/java/com/mashibing/search/service/impl/ElasticSearchServiceImpl.java index 99e0520..b8f3ae5 100644 --- a/beacon-search/src/main/java/com/mashibing/search/service/impl/ElasticSearchServiceImpl.java +++ b/beacon-search/src/main/java/com/mashibing/search/service/impl/ElasticSearchServiceImpl.java @@ -4,8 +4,11 @@ import com.mashibing.common.enums.ExceptionEnums; import com.mashibing.common.exception.SearchException; import com.mashibing.search.service.SearchService; import lombok.extern.slf4j.Slf4j; +import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.xcontent.XContentType; @@ -13,6 +16,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.io.IOException; +import java.util.Map; /** * @author heqijun @@ -27,6 +31,8 @@ public class ElasticSearchServiceImpl implements SearchService { private static String CREATED = "created"; + private static String UPDATED = "updated"; + @Autowired RestHighLevelClient client; @@ -49,4 +55,20 @@ public class ElasticSearchServiceImpl implements SearchService { log.info("【搜索模块-写入数据】成功!!!index = {},id = {},json = {},result = {}", index, id, json, result); } + + @Override + public boolean exist(String index, String id) throws IOException { + return client.exists(new GetRequest().index(index).id(id), RequestOptions.DEFAULT); + } + + @Override + public void update(String index, String id, Map doc) throws IOException { + UpdateResponse updateResponse = client.update(new UpdateRequest().index(index).id(id).doc(doc), RequestOptions.DEFAULT); + String result = updateResponse.getResult().getLowercase(); + if (!UPDATED.equals(result)) { + log.error("【搜索模块-更新数据】失败!!!index = {},id = {},doc = {},result = {}", index, id, doc, result); + throw new SearchException(ExceptionEnums.SEARCH_UPDATE_ERROR); + } + log.info("【搜索模块-更新数据】成功!!!index = {},id = {},doc = {},result = {}", index, id, doc, result); + } } diff --git a/beacon-smsgateway/src/main/java/com/mashibing/smsgateway/client/CacheClient.java b/beacon-smsgateway/src/main/java/com/mashibing/smsgateway/client/CacheClient.java new file mode 100644 index 0000000..c2fd0c7 --- /dev/null +++ b/beacon-smsgateway/src/main/java/com/mashibing/smsgateway/client/CacheClient.java @@ -0,0 +1,15 @@ +package com.mashibing.smsgateway.client; + +import com.mashibing.common.clients.BeaconCacheClient; +import org.springframework.cloud.openfeign.FeignClient; + +/** + * @author heqijun + * @ClassName: CacheClient + * @Description: 缓存模块openFeignClient接口 + * @date 2025/6/17 20:28 + */ + +@FeignClient("beacon-cache") +public interface CacheClient extends BeaconCacheClient { +} diff --git a/beacon-smsgateway/src/main/java/com/mashibing/smsgateway/config/RabbitMQConfig.java b/beacon-smsgateway/src/main/java/com/mashibing/smsgateway/config/RabbitMQConfig.java index fa30546..b8c8aae 100644 --- a/beacon-smsgateway/src/main/java/com/mashibing/smsgateway/config/RabbitMQConfig.java +++ b/beacon-smsgateway/src/main/java/com/mashibing/smsgateway/config/RabbitMQConfig.java @@ -1,9 +1,15 @@ package com.mashibing.smsgateway.config; -import org.springframework.amqp.core.AcknowledgeMode; +import com.mashibing.common.constant.RabbitMQConstant; +import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import static com.mashibing.common.constant.RabbitMQConstant.SMS_GATEWAY_DEAD_EXCHANGE; +import static com.mashibing.common.constant.RabbitMQConstant.SMS_GATEWAY_DEAD_QUEUE; /** * @author heqijun @@ -12,18 +18,46 @@ import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainer * @date 2025/6/13 16:04 */ -//@Configuration +@Configuration public class RabbitMQConfig { - // @Bean - public SimpleRabbitListenerContainerFactory gatewayContainerFactory(ConnectionFactory connectionFactory, - SimpleRabbitListenerContainerFactoryConfigurer configurer) { - SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory = new SimpleRabbitListenerContainerFactory(); - simpleRabbitListenerContainerFactory.setConcurrentConsumers(5); - simpleRabbitListenerContainerFactory.setPrefetchCount(10); - simpleRabbitListenerContainerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL); - configurer.configure(simpleRabbitListenerContainerFactory, connectionFactory); - return simpleRabbitListenerContainerFactory; + private final int TTL = 10000; + + private final String FANOUT_ROUTING_KEY = ""; + + // 声明死信队列,需要准备普通交换机,普通队列,死信交换机,死信队列 + @Bean + public Exchange normalExchange() { + return ExchangeBuilder.fanoutExchange(RabbitMQConstant.SMS_GATEWAY_NORMAL_EXCHANGE).build(); + } + + @Bean + public Queue normalQueue() { + return QueueBuilder.durable(RabbitMQConstant.SMS_GATEWAY_NORMAL_QUEUE) + .withArgument("x-message-ttl", TTL) + .withArgument("x-dead-letter-exchange", RabbitMQConstant.SMS_GATEWAY_DEAD_EXCHANGE) + .withArgument("x-dead-letter-routing-key", FANOUT_ROUTING_KEY) + .build(); + } + + @Bean + public Binding normalBinding(Exchange normalExchange, Queue normalQueue) { + return BindingBuilder.bind(normalQueue).to(normalExchange).with("").noargs(); + } + + @Bean + public Exchange deadExchange() { + return ExchangeBuilder.fanoutExchange(SMS_GATEWAY_DEAD_EXCHANGE).build(); + } + + @Bean + public Queue deadQueue() { + return QueueBuilder.durable(SMS_GATEWAY_DEAD_QUEUE).build(); + } + + @Bean + public Binding deadBinding(Exchange deadExchange, Queue deadQueue) { + return BindingBuilder.bind(deadQueue).to(deadExchange).with("").noargs(); } } \ No newline at end of file diff --git a/beacon-smsgateway/src/main/java/com/mashibing/smsgateway/netty4/CMPPHandler.java b/beacon-smsgateway/src/main/java/com/mashibing/smsgateway/netty4/CMPPHandler.java index 00d2e89..658f8a8 100644 --- a/beacon-smsgateway/src/main/java/com/mashibing/smsgateway/netty4/CMPPHandler.java +++ b/beacon-smsgateway/src/main/java/com/mashibing/smsgateway/netty4/CMPPHandler.java @@ -1,6 +1,13 @@ package com.mashibing.smsgateway.netty4; - +import com.mashibing.common.constant.SMSConstant; +import com.mashibing.common.pojo.StandardReport; +import com.mashibing.common.pojo.StandardSubmit; +import com.mashibing.common.utils.CMPP2ResultUtil; +import com.mashibing.common.utils.CMPPSubmitRepoMapUtil; +import com.mashibing.smsgateway.runnable.DeliverRunnable; +import com.mashibing.smsgateway.runnable.SubmitRepoRunnable; +import com.mashibing.smsgateway.utils.SpringUtil; import com.mashibing.smsgateway.netty4.entity.CmppDeliver; import com.mashibing.smsgateway.netty4.entity.CmppSubmitResp; import com.mashibing.smsgateway.netty4.utils.MsgUtils; @@ -8,12 +15,15 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.BeanUtils; +import java.util.concurrent.ThreadPoolExecutor; /** * 主要业务 handler,运营商响应信息 */ public class CMPPHandler extends SimpleChannelInboundHandler { + private final static Logger log = LoggerFactory.getLogger(CMPPHandler.class); @Override @@ -22,9 +32,14 @@ public class CMPPHandler extends SimpleChannelInboundHandler { if (msg instanceof CmppSubmitResp){ CmppSubmitResp resp=(CmppSubmitResp)msg; log.info("-------------接收到短信提交应答-------------"); - log.info("----自增id:"+resp.getSequenceId()); - log.info("----状态:"+ resp.getResult()); - log.info("----第一次响应:"+resp.getMsgId()); + log.info("----自增id:{}", resp.getSequenceId()); + log.info("----状态:{}", resp.getResult()); + log.info("----第一次响应:{}", resp.getMsgId()); + + ThreadPoolExecutor cmppSubmitPool = SpringUtil.getBeanByName("cmppSubmitPool", ThreadPoolExecutor.class); + + // 将任务提交给线程池处理 + cmppSubmitPool.execute(new SubmitRepoRunnable(resp)); } if (msg instanceof CmppDeliver){ @@ -33,12 +48,17 @@ public class CMPPHandler extends SimpleChannelInboundHandler { if (resp.getRegistered_Delivery() == 1) { // 如果是状态报告的话 log.info("-------------状态报告---------------"); - log.info("----第二次响应:"+resp.getMsg_Id_DELIVRD()); - log.info("----手机号:"+resp.getDest_terminal_Id()); - log.info("----状态:"+resp.getStat()); + log.info("----第二次响应:{}", resp.getMsg_Id_DELIVRD()); + log.info("----手机号:{}", resp.getDest_terminal_Id()); + log.info("----状态:{}", resp.getStat()); + + ThreadPoolExecutor cmppDeliverPool = SpringUtil.getBeanByName("cmppDeliverPool", ThreadPoolExecutor.class); + + cmppDeliverPool.execute(new DeliverRunnable(resp.getMsg_Id_DELIVRD(), resp.getStat())); + } else { //用户回复会打印在这里 - log.info(""+ MsgUtils.bytesToLong(resp.getMsg_Id())); + log.info("{}", MsgUtils.bytesToLong(resp.getMsg_Id())); log.info(resp.getSrc_terminal_Id()); log.info(resp.getMsg_Content()); } diff --git a/beacon-smsgateway/src/main/java/com/mashibing/smsgateway/runnable/DeliverRunnable.java b/beacon-smsgateway/src/main/java/com/mashibing/smsgateway/runnable/DeliverRunnable.java new file mode 100644 index 0000000..dc52442 --- /dev/null +++ b/beacon-smsgateway/src/main/java/com/mashibing/smsgateway/runnable/DeliverRunnable.java @@ -0,0 +1,69 @@ +package com.mashibing.smsgateway.runnable; + +import com.mashibing.common.constant.CacheConstant; +import com.mashibing.common.constant.RabbitMQConstant; +import com.mashibing.common.constant.SMSConstant; +import com.mashibing.common.pojo.StandardReport; +import com.mashibing.common.utils.CMPP2DeliverUtil; +import com.mashibing.common.utils.CMPPDeliverMapUtil; +import com.mashibing.smsgateway.client.CacheClient; +import com.mashibing.smsgateway.utils.SpringUtil; +import lombok.ToString; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.amqp.rabbit.core.RabbitTemplate; + +/** + * @author heqijun + * @ClassName: DeliverRunnable + * @Description: 短信发送报告任务 + * @date 2025/6/17 20:13 + */ + +@Slf4j +@ToString +public class DeliverRunnable implements Runnable { + + private RabbitTemplate rabbitTemplate = SpringUtil.getBeanByClass(RabbitTemplate.class); + + private CacheClient cacheClient = SpringUtil.getBeanByClass(CacheClient.class); + + private long msgId; + + private String stat; + + public DeliverRunnable(long msgId, String stat) { + this.msgId = msgId; + this.stat = stat; + } + + @Override + public void run() { + log.info("【网关模块-CMPPHandler】第二次响应,任务交给{}线程处理", Thread.currentThread().getName()); + StandardReport report = CMPPDeliverMapUtil.remove(msgId + ""); + + // 根据运营商返回的stat重新封装report + if (StringUtils.isNotBlank(stat) && stat.equals("DELIVRD")) { + // 发送成功 + report.setReportState(SMSConstant.REPORT_STATE_SUCCESS); + } else { + // 发送失败 + report.setReportState(SMSConstant.REPORT_STATE_FAILED); + report.setErrorMsg(CMPP2DeliverUtil.getResultMessage(stat)); + } + + // 发送报告 + Integer isCallback = cacheClient.hgetInteger(CacheConstant.CLIENT_BUSINESS + report.getApikey(), CacheConstant.IS_CALLBACK); + if (isCallback != null && isCallback == 1) { + String callbackUrl = cacheClient.hgetString(CacheConstant.CLIENT_BUSINESS + report.getApikey(), CacheConstant.CALLBACK_URL); + if (StringUtils.isNotBlank(callbackUrl)) { + report.setIsCallback(isCallback); + report.setCallbackUrl(callbackUrl); + rabbitTemplate.convertAndSend(RabbitMQConstant.SMS_PUSH_REPORT, report); + } + } + + // 死信队列-确保es中已经存储第一次响应时的数据 + rabbitTemplate.convertAndSend(RabbitMQConstant.SMS_GATEWAY_NORMAL_EXCHANGE, "", report); + } +} diff --git a/beacon-smsgateway/src/main/java/com/mashibing/smsgateway/runnable/SubmitRepoRunnable.java b/beacon-smsgateway/src/main/java/com/mashibing/smsgateway/runnable/SubmitRepoRunnable.java new file mode 100644 index 0000000..b50768a --- /dev/null +++ b/beacon-smsgateway/src/main/java/com/mashibing/smsgateway/runnable/SubmitRepoRunnable.java @@ -0,0 +1,58 @@ +package com.mashibing.smsgateway.runnable; + +import com.mashibing.common.constant.RabbitMQConstant; +import com.mashibing.common.constant.SMSConstant; +import com.mashibing.common.pojo.StandardReport; +import com.mashibing.common.pojo.StandardSubmit; +import com.mashibing.common.utils.CMPP2ResultUtil; +import com.mashibing.common.utils.CMPPDeliverMapUtil; +import com.mashibing.common.utils.CMPPSubmitRepoMapUtil; +import com.mashibing.smsgateway.netty4.entity.CmppSubmitResp; +import com.mashibing.smsgateway.utils.SpringUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.BeanUtils; + +/** + * @author heqijun + * @ClassName: SubmitRepoRunnable + * @Description: 短信提交应答任务 + * @date 2025/6/17 18:58 + */ + +@Slf4j +public class SubmitRepoRunnable implements Runnable { + + public static final int OK = 0; + + private CmppSubmitResp resp; + + private RabbitTemplate rabbitTemplate = SpringUtil.getBeanByClass(RabbitTemplate.class); + + public SubmitRepoRunnable(CmppSubmitResp resp) { + this.resp = resp; + } + + @Override + public void run() { + log.info("【网关模块-CMPPHandler】第一次响应,任务交给{}线程处理", Thread.currentThread().getName()); + // 根据响应结果获取submit对象 + StandardReport report = null; + StandardSubmit submit = CMPPSubmitRepoMapUtil.remove(resp.getSequenceId()); + + // 根据响应判断短信状态 + int result = resp.getResult(); + if (result != OK) { + // 失败,重新封装submit,封装之后写入日志 + submit.setReportState(SMSConstant.REPORT_STATE_FAILED); + submit.setErrorMsg(CMPP2ResultUtil.getResultMessage(result)); + } else { + // 成功,构建report对象,放入临时存储中,准备迎接第二次响应再去操作report + report = new StandardReport(); + BeanUtils.copyProperties(submit, report); + CMPPDeliverMapUtil.put(resp.getMsgId() + "", report); + } + + rabbitTemplate.convertAndSend(RabbitMQConstant.SMS_WRITE_LOG, submit); + } +} diff --git a/beacon-smsgateway/src/main/java/com/mashibing/smsgateway/utils/SpringUtil.java b/beacon-smsgateway/src/main/java/com/mashibing/smsgateway/utils/SpringUtil.java new file mode 100644 index 0000000..f0ee5c8 --- /dev/null +++ b/beacon-smsgateway/src/main/java/com/mashibing/smsgateway/utils/SpringUtil.java @@ -0,0 +1,38 @@ +package com.mashibing.smsgateway.utils; + +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.NoSuchBeanDefinitionException; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.stereotype.Component; + +/** + * @author heqijun + * @ClassName: SpringUtil + * @Description: 用于在非ioc管理的类中使用spring中的bean + * @date 2025/6/17 18:36 + */ + +@Component +public class SpringUtil implements ApplicationContextAware { + + private static ApplicationContext applicationContext; + + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + SpringUtil.applicationContext = applicationContext; + } + + public static T getBeanByName(String beanName, Class clazz) { + Object bean = applicationContext.getBean(beanName); + if (clazz.isInstance(bean)) { + return clazz.cast(bean); + } else { + throw new NoSuchBeanDefinitionException("No bean with name = " + beanName + " and type = " + clazz.getSimpleName() + " found"); + } + } + + public static T getBeanByClass(Class clazz) { + return applicationContext.getBean(clazz); + } +} diff --git a/beacon-strategy/src/main/java/com/mashibing/strategy/service/strategyfilter/impl/LimitHourStrategyFilter.java b/beacon-strategy/src/main/java/com/mashibing/strategy/service/strategyfilter/impl/LimitHourStrategyFilter.java index c14b671..61a550c 100644 --- a/beacon-strategy/src/main/java/com/mashibing/strategy/service/strategyfilter/impl/LimitHourStrategyFilter.java +++ b/beacon-strategy/src/main/java/com/mashibing/strategy/service/strategyfilter/impl/LimitHourStrategyFilter.java @@ -36,7 +36,7 @@ public class LimitHourStrategyFilter implements StrategyFilter { private final long LIMIT_DURATION = 60 * 60 * 1000 - 1; - private final int LIMIT_COUNT = 3; + private final int LIMIT_COUNT = 100; private static final String STRATEGY_NAME = "小时限流";