烽火云长椿街3.27 1:47

master
DanielDeng 3 months ago
parent a1b50f82f2
commit d790b9c214

@ -1,6 +1,5 @@
package com.mashibing.api.controller;
import com.alibaba.cloud.commons.lang.StringUtils;
import com.mashibing.api.filter.CheckFilterContext;
import com.mashibing.api.form.SingleSendForm;
import com.mashibing.api.util.R;
@ -10,6 +9,7 @@ import com.mashibing.common.enums.ExceptionEnums;
import com.mashibing.common.model.StandardSubmit;
import com.mashibing.common.util.SnowFlakeUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;

@ -25,7 +25,16 @@ public interface RabbitMQConstants {
String SMS_PUSH_REPORT = "sms_push_report_topic";
/**
*
*
*/
String SMS_GATEWAY = "sms_gateway_topic_";
/**
*
*/
String SMS_GATEWAY_NORMAL_EXCHANGE = "sms_gateway_normal_exchange";
String SMS_GATEWAY_NORMAL_QUEUE = "sms_gateway_normal_queue";
String SMS_GATEWAY_DEAD_EXCHANGE = "sms_gateway_dead_exchange";
String SMS_GATEWAY_DEAD_QUEUE = "sms_gateway_dead_queue";
}

@ -0,0 +1,28 @@
package com.mashibing.common.enums;
import lombok.Getter;
/**
* @author dch
* @create 2024-03-27 0:29
*/
@Getter
public enum CMPP2DeliverEnums {
DELIVRD("DELIVRD","Message is delivered to destination"),
EXPIRED("EXPIRED","Message validity period has expired"),
DELETED("DELETED","Message has been deleted"),
UNDELIV("UNDELIV","Message is undeliverable"),
ACCEPTD("ACCEPTD","Message is in accepted state"),
UNKNOWN("UNKNOWN","Message is in invalid state"),
REJECTD("REJECTD","Message is in a rejected state"),
;
private String stat;
private String description;
CMPP2DeliverEnums(String stat, String description) {
this.stat = stat;
this.description = description;
}
}

@ -0,0 +1,32 @@
package com.mashibing.common.enums;
import lombok.Getter;
/**
* @author dch
* @create 2024-03-26 23:31
*/
@Getter
public enum CMPP2ResultEnums {
OK(0, "正确"),
MESSAGE_BUILD_ERROR(1, "消息体结构错误"),
COMMAND_WORD_ERROR(2, "命令字错"),
MESSAGE_SEQUENCE_ERROR(3, "消息序号重复"),
MESSAGE_LENGTH_ERROR(4, "消息长度错误"),
INCORRECT_TARIFF_CODE(5, "资费代码错误"),
EXCEEDING_MAXIMUM_MESSAGE_LENGTH(6, "超过最大信息长"),
BUSINESS_CODE_ERROR(7, "业务代码错误"),
FLOW_CONTROL_ERROR(8, "流量控制错误"),
UNKNOWN(9, "其他错误"),
;
private Integer result;
private String msg;
CMPP2ResultEnums(Integer result, String msg) {
this.result = result;
this.msg = msg;
}
}

@ -28,6 +28,11 @@ import java.time.LocalDateTime;
@AllArgsConstructor
public class StandardReport implements Serializable {
/**
* 便
*/
private String apikey;
/**
*
*/

@ -0,0 +1,28 @@
package com.mashibing.common.util;
import com.mashibing.common.enums.CMPP2DeliverEnums;
import com.mashibing.common.enums.CMPP2ResultEnums;
import java.util.HashMap;
import java.util.Map;
/**
* @author dch
* @create 2024-03-26 23:40
*/
public class CMPP2DeliverUtil {
private static Map<String,String> states = new HashMap<>();
static {
CMPP2DeliverEnums[] cmpp2DeliverEnums = CMPP2DeliverEnums.values();
for (CMPP2DeliverEnums cmpp2ResultEnum : cmpp2DeliverEnums){
states.put(cmpp2ResultEnum.getStat(),cmpp2ResultEnum.getDescription());
}
}
public static String getResultMessage(String result){
return states.get(result);
}
}

@ -0,0 +1,28 @@
package com.mashibing.common.util;
import com.mashibing.common.enums.CMPP2ResultEnums;
import com.mashibing.common.enums.MobileOperatorEnum;
import java.util.HashMap;
import java.util.Map;
/**
* @author dch
* @create 2024-03-26 23:40
*/
public class CMPP2ResultUtil {
private static Map<Integer,String> results = new HashMap<>();
static {
CMPP2ResultEnums[] cmpp2ResultEnums = CMPP2ResultEnums.values();
for (CMPP2ResultEnums cmpp2ResultEnum : cmpp2ResultEnums){
results.put(cmpp2ResultEnum.getResult(),cmpp2ResultEnum.getMsg());
}
}
public static String getResultMessage(Integer result){
return results.get(result);
}
}

@ -0,0 +1,28 @@
package com.mashibing.common.util;
import com.mashibing.common.model.StandardReport;
import com.mashibing.common.model.StandardSubmit;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author dch
* @create 2024-03-26 21:40
* CMPP
*/
public class CMPPDeliverMapUtil {
private static ConcurrentHashMap<String, StandardReport> map = new ConcurrentHashMap<>();
public static void put(String msgId, StandardReport submit) {
map.put(msgId + "", submit);
}
public static StandardReport get(String msgId) {
return map.get(msgId);
}
public static StandardReport remove(String msgId) {
return map.remove(msgId);
}
}

@ -0,0 +1,27 @@
package com.mashibing.common.util;
import com.mashibing.common.model.StandardSubmit;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author dch
* @create 2024-03-26 21:40
* CMPP
*/
public class CMPPSubmitRepoMapUtil {
private static ConcurrentHashMap<String, StandardSubmit> map = new ConcurrentHashMap<>();
public static void put(int sequence, StandardSubmit submit) {
map.put(sequence + "", submit);
}
public static StandardSubmit get(int sequence) {
return map.get(sequence + "");
}
public static StandardSubmit remove(int sequence) {
return map.remove(sequence + "");
}
}

@ -130,6 +130,9 @@ public class PushReportListener {
if (!flag) {
log.info("【推送模块-推送状态报告】 第{}次推送状态报告失败report = {}", report.getResendCount() + 1, report);
report.setResendCount(report.getResendCount() + 1);
if (report.getReportState() >= 5) {
return;
}
rabbitTemplate.convertAndSend(RabbitMQConfig.DELAYED_EXCHANGE, "", report, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {

@ -61,7 +61,18 @@
<version>3.12.0</version>
</dependency>
<!-- hippo4j-client的依赖-->
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-spring-boot-starter</artifactId>
<version>1.5.0</version>
</dependency>
<!--openFeign调用缓存-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
</dependencies>

@ -1,8 +1,10 @@
package com.mashibing.smsgateway;
import cn.hippo4j.core.enable.EnableDynamicThreadPool;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.openfeign.EnableFeignClients;
/**
* @author dch
@ -10,6 +12,8 @@ import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
*/
@EnableDiscoveryClient
@SpringBootApplication
@EnableDynamicThreadPool
@EnableFeignClients
public class SmsGatewayStarterApp {
public static void main(String[] args) {
SpringApplication.run(SmsGatewayStarterApp.class, args);

@ -0,0 +1,22 @@
package com.mashibing.smsgateway.client;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.*;
import java.util.Map;
import java.util.Set;
/**
* @author dch
* @create 2024-03-21 10:14
*/
@FeignClient(value = "beacon-cache")
public interface BeaconCacheClient {
@GetMapping("/cache/hget/{key}/{field}")
Integer hgetInteger(@PathVariable(value = "key") String key, @PathVariable(value = "field") String field);
@GetMapping("/cache/hget/{key}/{field}")
String hget(@PathVariable(value = "key") String key, @PathVariable(value = "field") String field);
}

@ -1,21 +1,68 @@
package com.mashibing.smsgateway.config;
import org.springframework.amqp.core.AcknowledgeMode;
import static com.mashibing.common.constant.RabbitMQConstants.*;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author dch
* @create 2024-03-26 14:29
*
*/
//@Configuration
@Configuration
public class RabbitMQConfig {
private final int TTL = 10000;
private final String FANOUT_ROUTING_KEY = "";
//声明死信队列 需要准备普通交换机、普通队列、死信交换机、死信队列
@Bean
public Exchange normalExchange(){
return ExchangeBuilder.fanoutExchange(SMS_GATEWAY_NORMAL_EXCHANGE).build();
}
@Bean
public Queue normalQueue(){
Queue queue = QueueBuilder.durable(SMS_GATEWAY_NORMAL_QUEUE)
.withArgument("x-message-ttl",TTL)
.withArgument("x-dead-letter-exchange",SMS_GATEWAY_DEAD_EXCHANGE)
.withArgument("x-dead-letter-routing-key",FANOUT_ROUTING_KEY)
.build();
return queue;
}
@Bean
public Binding normalBinding(Exchange normalExchange,Queue normalQueue){
return BindingBuilder.bind(normalQueue).to(normalExchange).with("").noargs();
}
@Bean
public Exchange deadExchange(){
return ExchangeBuilder.fanoutExchange(SMS_GATEWAY_DEAD_EXCHANGE).build();
}
@Bean
public Queue deadQueue(){
return QueueBuilder.durable(SMS_GATEWAY_DEAD_QUEUE).build();
}
@Bean
public Binding deadBinging(Exchange deadExchange,Queue deadQueue){
return BindingBuilder.bind(deadQueue).to(deadExchange).with("").noargs();
}
//配置类的方式修改RabbitMQ消费的方式
/* @Bean
public SimpleRabbitListenerContainerFactory gatewayContainerFactory(ConnectionFactory connectionFactory,
SimpleRabbitListenerContainerFactoryConfigurer configurer){
SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory = new SimpleRabbitListenerContainerFactory();
@ -24,6 +71,6 @@ public class RabbitMQConfig {
simpleRabbitListenerContainerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
configurer.configure(simpleRabbitListenerContainerFactory,connectionFactory);
return simpleRabbitListenerContainerFactory;
}
}*/
}

@ -0,0 +1,44 @@
package com.mashibing.smsgateway.config;
import cn.hippo4j.core.executor.DynamicThreadPool;
import cn.hippo4j.core.executor.support.ThreadPoolBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @author dch
* @create 2024-03-26 23:13
*/
@Configuration
public class ThreadPoolConfig {
@Bean
@DynamicThreadPool
public ThreadPoolExecutor cmppSubmitPool() {
String threadPoolId = "cmpp-submit";
ThreadPoolExecutor messageConsumeDynamicExecutor = ThreadPoolBuilder.builder()
// 指定线程名称的前缀
.threadFactory(threadPoolId)
// 线程池在Hippo4j中的唯一标识
.threadPoolId(threadPoolId)
// 代表动态线程池
.dynamicPool()
.build();
return messageConsumeDynamicExecutor;
}
@Bean
@DynamicThreadPool
public ThreadPoolExecutor cmppDeliverPool() {
String threadPoolId = "cmpp-deliver";
ThreadPoolExecutor messageProduceDynamicExecutor = ThreadPoolBuilder.builder()
.threadFactory(threadPoolId)
.threadPoolId(threadPoolId)
.dynamicPool()
.build();
return messageProduceDynamicExecutor;
}
}

@ -1,10 +1,16 @@
package com.mashibing.smsgateway.mq;
import com.mashibing.common.model.StandardSubmit;
import com.mashibing.common.util.CMPPSubmitRepoMapUtil;
import com.mashibing.smsgateway.netty4.NettyClient;
import com.mashibing.smsgateway.netty4.entity.CmppSubmit;
import com.mashibing.smsgateway.netty4.utils.Command;
import com.mashibing.smsgateway.netty4.utils.MsgUtils;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
@ -17,20 +23,29 @@ import java.io.IOException;
@Slf4j
public class SmsGatewayListener {
@RabbitListener(queues = "${gateway.sendtopic}")
// public void consume(StandardSubmit submit, Channel channel, Message message) throws IOException, InterruptedException {
public void consume(String submit, Channel channel, Message message) throws IOException, InterruptedException {
Thread.sleep(1000);
log.info("【短信网关模块】 接收到消息 submit = {}", submit);
@Autowired
private NettyClient nettyClient;
@RabbitListener(queues = "${gateway.sendtopic}")
public void consume(StandardSubmit submit, Channel channel, Message message) throws IOException, InterruptedException {
log.info("【短信网关模块】 接收到消息 submit = {}",submit);
// =====================完成运营商交互,发送一次请求,接收两次响应==========================
//long deliveryTag 消息的唯一标识。每条消息都有自己的ID号用于标识该消息在channel中的顺序。当消费者接收到消息后需要调用channel.basicAck方法并传递deliveryTag来确认消息的处理。
//boolean multiple 是否批量确认消息当传false时只确认当前 deliveryTag对应的消息;当传true时会确认当前及之前所有未确认的消息。
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
//1、获取需要的核心属性
String srcNumber = submit.getSrcNumber();
String mobile = submit.getMobile();
String text = submit.getText();
// 这个序列是基于++实现的当取值达到MAX时会被重置这个值是可以重复利用的。
int sequence = MsgUtils.getSequence();
//2、声明发送短息时需要的CMPPSubmit对象
CmppSubmit cmppSubmit = new CmppSubmit(Command.CMPP2_VERSION,srcNumber,sequence,mobile,text);
//3、将submit对象做一个临时存储在运营商第一次响应时可以获取到。
// 如果怕出问题服务器宕机数据丢失可以上Redis~~~
CMPPSubmitRepoMapUtil.put(sequence,submit);
//4、和运营商交互发送短信
nettyClient.submit(cmppSubmit);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}

@ -4,16 +4,23 @@ package com.mashibing.smsgateway.netty4;
import com.mashibing.smsgateway.netty4.entity.CmppDeliver;
import com.mashibing.smsgateway.netty4.entity.CmppSubmitResp;
import com.mashibing.smsgateway.netty4.utils.MsgUtils;
import com.mashibing.smsgateway.runnable.DeliverRunnable;
import com.mashibing.smsgateway.runnable.SubmitRepoRunnable;
import com.mashibing.smsgateway.util.SpringUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import sun.nio.ch.ThreadPool;
import java.util.concurrent.ThreadPoolExecutor;
/**
* handler,
*/
public class CMPPHandler extends SimpleChannelInboundHandler {
private final static Logger log = LoggerFactory.getLogger(CMPPHandler.class);
@Override
@ -25,6 +32,10 @@ public class CMPPHandler extends SimpleChannelInboundHandler {
log.info("----自增id"+resp.getSequenceId());
log.info("----状态:"+ resp.getResult());
log.info("----第一次响应:"+resp.getMsgId());
// 将封装好的任务扔到线程池中,执行即可
//两个线程池-1
ThreadPoolExecutor cmppSubmitPool = (ThreadPoolExecutor) SpringUtil.getBeanByName("cmppSubmitPool");
cmppSubmitPool.execute(new SubmitRepoRunnable(resp));
}
if (msg instanceof CmppDeliver){
@ -36,6 +47,9 @@ public class CMPPHandler extends SimpleChannelInboundHandler {
log.info("----第二次响应:"+resp.getMsg_Id_DELIVRD());
log.info("----手机号:"+resp.getDest_terminal_Id());
log.info("----状态:"+resp.getStat());
//两个线程池-2
ThreadPoolExecutor cmppDeliverPool = (ThreadPoolExecutor) SpringUtil.getBeanByName("cmppDeliverPool");
cmppDeliverPool.execute(new DeliverRunnable(resp.getMsg_Id_DELIVRD(),resp.getStat()));
} else {
//用户回复会打印在这里
log.info(""+ MsgUtils.bytesToLong(resp.getMsg_Id()));

@ -0,0 +1,71 @@
package com.mashibing.smsgateway.runnable;
import com.mashibing.common.constant.CacheConstant;
import com.mashibing.common.constant.RabbitMQConstants;
import com.mashibing.common.constant.SmsConstant;
import com.mashibing.common.model.StandardReport;
import com.mashibing.common.util.CMPP2DeliverUtil;
import com.mashibing.common.util.CMPPDeliverMapUtil;
import com.mashibing.smsgateway.client.BeaconCacheClient;
import com.mashibing.smsgateway.util.SpringUtil;
import org.apache.commons.lang.StringUtils;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
/**
* @author dch
* @create 2024-03-27 0:26
*/
public class DeliverRunnable implements Runnable {
private RabbitTemplate rabbitTemplate = SpringUtil.getBeanByClass(RabbitTemplate.class);
private BeaconCacheClient cacheClient = SpringUtil.getBeanByClass(BeaconCacheClient.class);
private final String DELIVRD = "DELIVRD";
private long msgId;
private String stat;
public DeliverRunnable(long msgId, String stat) {
this.msgId = msgId;
this.stat = stat;
}
@Override
public void run() {
//1、基于msgId拿到临时存储的Report对象
StandardReport report = CMPPDeliverMapUtil.remove(msgId + "");
//2、确认当前短信发送的最终状态
if(!StringUtils.isEmpty(stat) && stat.equals(DELIVRD)){
// 短信发送成功
report.setReportState(SmsConstant.REPORT_SUCCESS);
}else{
// 短信发送失败
report.setReportState(SmsConstant.REPORT_FAIL);
report.setErrorMsg(CMPP2DeliverUtil.getResultMessage(stat));
}
//3、客户状态报告推送让网关模块查询缓存当前客户是否需要状态报告推送
// 查询当前客户的isCallback
Integer isCallback = cacheClient.hgetInteger(CacheConstant.CLIENT_BUSINESS + report.getApikey(), "isCallback");
if(isCallback == 1){
// 如果需要回调,再查询客户的回调地址
String callbackUrl = cacheClient.hget(CacheConstant.CLIENT_BUSINESS + report.getApikey(), "callbackUrl");
// 如果回调地址不为空。
if(!StringUtils.isEmpty(callbackUrl)){
// 封装客户的报告推送的信息开始封装StandardReport
report.setIsCallback(isCallback);
report.setCallbackUrl(callbackUrl);
// 发送消息到RabbitMQ
rabbitTemplate.convertAndSend(RabbitMQConstants.SMS_PUSH_REPORT,report);
}
}
//4、发送消息让搜索模块对之前写入的信息做修改这里需要做一个死信队列延迟10s发送修改es信息的消息
// 声明好具体的交换机和队列后直接发送report到死信队列即可
rabbitTemplate.convertAndSend(RabbitMQConstants.SMS_GATEWAY_NORMAL_EXCHANGE,"",report);
}
}

@ -0,0 +1,55 @@
package com.mashibing.smsgateway.runnable;
import com.mashibing.common.constant.RabbitMQConstants;
import com.mashibing.common.constant.SmsConstant;
import com.mashibing.common.model.StandardReport;
import com.mashibing.common.model.StandardSubmit;
import com.mashibing.common.util.CMPP2ResultUtil;
import com.mashibing.common.util.CMPPDeliverMapUtil;
import com.mashibing.common.util.CMPPSubmitRepoMapUtil;
import com.mashibing.smsgateway.netty4.entity.CmppSubmitResp;
import com.mashibing.smsgateway.util.SpringUtil;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeanUtils;
/**
* @author dch
* @create 2024-03-26 23:59
*/
public class SubmitRepoRunnable implements Runnable {
private RabbitTemplate rabbitTemplate = SpringUtil.getBeanByClass(RabbitTemplate.class);
private CmppSubmitResp submitResp;
private final int OK = 0;
public SubmitRepoRunnable(CmppSubmitResp submitResp) {
this.submitResp = submitResp;
}
@Override
public void run() {
StandardReport report = null;
//1、拿到自增ID并且从ConcurrentHashMap中获取到存储的submit
StandardSubmit submit = CMPPSubmitRepoMapUtil.remove(submitResp.getSequenceId());
//2、根据运营商返回的result确认短信状态并且封装submit
int result = submitResp.getResult();
if (result != OK) {
// 到这,说明运营商的提交应答中回馈的失败的情况
String resultMessage = CMPP2ResultUtil.getResultMessage(result);
submit.setReportState(SmsConstant.REPORT_FAIL);
submit.setErrorMsg(resultMessage);
} else {
// 如果没进到if中说明运营商已经正常的接收了发送短信的任务这边完成3操作
//3、将submit封装为Report临时存储以便运营商返回状态码时可以再次获取到信息
// 这里没有对其他信息做封装
report = new StandardReport();
BeanUtils.copyProperties(submit, report);
CMPPDeliverMapUtil.put(submitResp.getMsgId() + "",report);
}
//4、将封装好的submit直接扔RabbitMQ中让搜索模块记录信息
rabbitTemplate.convertAndSend(RabbitMQConstants.SMS_WRITE_LOG,submit);
}
}

@ -0,0 +1,29 @@
package com.mashibing.smsgateway.util;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
/**
* @author dch
* @create 2024-03-26 23:56
*/
@Component
public class SpringUtil implements ApplicationContextAware {
private static ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
SpringUtil.applicationContext = applicationContext;
}
public static Object getBeanByName(String beanName){
return SpringUtil.applicationContext.getBean(beanName);
}
public static <T>T getBeanByClass(Class<T> clazz){
return SpringUtil.applicationContext.getBean(clazz);
}
}

@ -65,6 +65,7 @@
<version>5.8.12</version>
</dependency>
</dependencies>
</project>

Loading…
Cancel
Save