使用动态线程池处理CMPP两次响应实现+搜索模块更新日志功能实现

main
heqijun 3 months ago
parent 47d9d86be3
commit 4ae0e806fc

@ -32,4 +32,12 @@ public interface RabbitMQConstant {
*
*/
String SMS_GATEWAY = "sms_gateway_topic_";
/**
*
*/
String SMS_GATEWAY_NORMAL_EXCHANGE = "sms_gateway_normal_exchange";
String SMS_GATEWAY_NORMAL_QUEUE = "sms_gateway_normal_queue";
String SMS_GATEWAY_DEAD_EXCHANGE = "sms_gateway_dead_exchange";
String SMS_GATEWAY_DEAD_QUEUE = "sms_gateway_dead_queue";
}

@ -0,0 +1,16 @@
package com.mashibing.common.constant;
/**
* @author heqijun
* @ClassName: SearchConstant
* @Description:
* @date 2025/6/18 20:30
*/
public interface SearchConstant {
/**
*
*/
String INDEX_PREFIX = "sms_submit_log_";
}

@ -27,7 +27,9 @@ public enum ExceptionEnums {
LIMIT_MINUTE(-16, "达到分钟限流阈值!!"),
LIMIT_HOUR(-17, "达到小时限流阈值!!"),
NO_CHANNEL(-18, "没有可用通道!!!"),
SEARCH_INDEX_ERROR(-20, "es添加一行文档失败");
SEARCH_INDEX_ERROR(-20, "es添加一行文档失败"),
SEARCH_UPDATE_ERROR(-21, "es更新文档失败"),
;
private final int code;

@ -56,4 +56,10 @@ public class StandardReport implements Serializable {
@Description("推送报告推送次数")
private Integer pushTimes = 0;
@Description("apikey")
private String apikey;
@Description("日志更新次数")
private Integer updateTimes = 0;
}

@ -15,8 +15,8 @@ public class CMPPDeliverMapUtil {
private static ConcurrentHashMap<String, StandardReport> tempReport = new ConcurrentHashMap<>();
public static void put(String msgId, StandardReport submit) {
tempReport.put(msgId, submit);
public static void put(String msgId, StandardReport report) {
tempReport.put(msgId, report);
}
public static StandardReport get(String msgId) {

@ -0,0 +1,67 @@
package com.mashibing.search.mq;
import com.mashibing.common.constant.RabbitMQConstant;
import com.mashibing.common.constant.SearchConstant;
import com.mashibing.common.pojo.StandardReport;
import com.mashibing.search.service.SearchService;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;
/**
* @author heqijun
* @ClassName: SmsUpdateLogListener
* @Description:
* @date 2025/6/18 18:48
*/
@Slf4j
@Component
public class SmsUpdateLogListener {
@Autowired
SearchService searchService;
@Autowired
RabbitTemplate rabbitTemplate;
private static final int UPDATE_TIMES_MAX = 3;
@RabbitListener(queues = {RabbitMQConstant.SMS_GATEWAY_DEAD_QUEUE})
public void consume(StandardReport report, Channel channel, Message message) throws IOException {
log.info("【搜索模块-更新日志】接收到更新日志的信息report={}", report);
// 投递次数+1
report.setUpdateTimes(report.getUpdateTimes() + 1);
String sequenceId = report.getSequenceId().toString();
int year = LocalDateTime.now().getYear();
boolean exist = searchService.exist(SearchConstant.INDEX_PREFIX + year, sequenceId);
if (exist) {
// 存在,更新日志
Map<String, Object> map = new HashMap<>();
map.put("reportState", report.getReportState());
searchService.update(SearchConstant.INDEX_PREFIX + year, sequenceId, map);
// log.info("【搜索模块-更新日志】更新成功!!!更新字段:{}report={}", map, report);
} else {
// 不存在
if (report.getUpdateTimes() >= UPDATE_TIMES_MAX) {
// 达到最大投递次数,不扔回队列,而是直接记录日志
log.error("【搜索模块-更新日志】已达到最大投递次数更新失败report={}", report);
} else {
// 将消息扔回队列中
log.info("【搜索模块-更新日志】ES中不存在当前日志投递次数{}report={}", report.getUpdateTimes(), report);
rabbitTemplate.convertAndSend(RabbitMQConstant.SMS_GATEWAY_NORMAL_EXCHANGE,"", report);
}
}
//手动ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}

@ -1,6 +1,7 @@
package com.mashibing.search.mq;
import com.mashibing.common.constant.RabbitMQConstant;
import com.mashibing.common.constant.SearchConstant;
import com.mashibing.common.pojo.StandardSubmit;
import com.mashibing.common.utils.JsonUtil;
import com.mashibing.search.service.SearchService;
@ -28,17 +29,16 @@ public class SmsWriteLogListener {
@Autowired
SearchService searchService;
private final String INDEX = "sms_submit_log_";
@RabbitListener(queues = {RabbitMQConstant.SMS_WRITE_LOG})
public void process(StandardSubmit submit, Channel channel, Message message) throws IOException {
log.info("接收到存储日志的信息submit={}", submit);
log.info("【搜索模块-写入日志】接收到写入日志的信息submit={}", submit);
String sequenceId = submit.getSequenceId().toString();
String json = JsonUtil.objToJson(submit);
String year = LocalDateTime.now().getYear() + "";
searchService.index(INDEX + year, sequenceId, json);
searchService.index(SearchConstant.INDEX_PREFIX + year, sequenceId, json);
//手动ack

@ -1,6 +1,7 @@
package com.mashibing.search.service;
import java.io.IOException;
import java.util.Map;
/**
* @author heqijun
@ -14,9 +15,28 @@ public interface SearchService {
/**
* es
*
* @param index
* @param index
* @param id id
* @param json
* @param json json
*/
void index(String index, String id, String json) throws IOException;
/**
*
*
* @param index
* @param id id
* @return true/false
* @throws IOException
*/
boolean exist(String index, String id) throws IOException;
/**
* @param index
* @param id id
* @param doc
* @throws IOException
*/
void update(String index, String id, Map<String, Object> doc) throws IOException;
}

@ -4,8 +4,11 @@ import com.mashibing.common.enums.ExceptionEnums;
import com.mashibing.common.exception.SearchException;
import com.mashibing.search.service.SearchService;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
@ -13,6 +16,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.Map;
/**
* @author heqijun
@ -27,6 +31,8 @@ public class ElasticSearchServiceImpl implements SearchService {
private static String CREATED = "created";
private static String UPDATED = "updated";
@Autowired
RestHighLevelClient client;
@ -49,4 +55,20 @@ public class ElasticSearchServiceImpl implements SearchService {
log.info("【搜索模块-写入数据】成功index = {},id = {},json = {},result = {}", index, id, json, result);
}
@Override
public boolean exist(String index, String id) throws IOException {
return client.exists(new GetRequest().index(index).id(id), RequestOptions.DEFAULT);
}
@Override
public void update(String index, String id, Map<String, Object> doc) throws IOException {
UpdateResponse updateResponse = client.update(new UpdateRequest().index(index).id(id).doc(doc), RequestOptions.DEFAULT);
String result = updateResponse.getResult().getLowercase();
if (!UPDATED.equals(result)) {
log.error("【搜索模块-更新数据】失败index = {},id = {},doc = {},result = {}", index, id, doc, result);
throw new SearchException(ExceptionEnums.SEARCH_UPDATE_ERROR);
}
log.info("【搜索模块-更新数据】成功index = {},id = {},doc = {},result = {}", index, id, doc, result);
}
}

@ -0,0 +1,15 @@
package com.mashibing.smsgateway.client;
import com.mashibing.common.clients.BeaconCacheClient;
import org.springframework.cloud.openfeign.FeignClient;
/**
* @author heqijun
* @ClassName: CacheClient
* @Description: openFeignClient
* @date 2025/6/17 20:28
*/
@FeignClient("beacon-cache")
public interface CacheClient extends BeaconCacheClient {
}

@ -1,9 +1,15 @@
package com.mashibing.smsgateway.config;
import org.springframework.amqp.core.AcknowledgeMode;
import com.mashibing.common.constant.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import static com.mashibing.common.constant.RabbitMQConstant.SMS_GATEWAY_DEAD_EXCHANGE;
import static com.mashibing.common.constant.RabbitMQConstant.SMS_GATEWAY_DEAD_QUEUE;
/**
* @author heqijun
@ -12,18 +18,46 @@ import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainer
* @date 2025/6/13 16:04
*/
//@Configuration
@Configuration
public class RabbitMQConfig {
// @Bean
public SimpleRabbitListenerContainerFactory gatewayContainerFactory(ConnectionFactory connectionFactory,
SimpleRabbitListenerContainerFactoryConfigurer configurer) {
SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory = new SimpleRabbitListenerContainerFactory();
simpleRabbitListenerContainerFactory.setConcurrentConsumers(5);
simpleRabbitListenerContainerFactory.setPrefetchCount(10);
simpleRabbitListenerContainerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
configurer.configure(simpleRabbitListenerContainerFactory, connectionFactory);
return simpleRabbitListenerContainerFactory;
private final int TTL = 10000;
private final String FANOUT_ROUTING_KEY = "";
// 声明死信队列,需要准备普通交换机,普通队列,死信交换机,死信队列
@Bean
public Exchange normalExchange() {
return ExchangeBuilder.fanoutExchange(RabbitMQConstant.SMS_GATEWAY_NORMAL_EXCHANGE).build();
}
@Bean
public Queue normalQueue() {
return QueueBuilder.durable(RabbitMQConstant.SMS_GATEWAY_NORMAL_QUEUE)
.withArgument("x-message-ttl", TTL)
.withArgument("x-dead-letter-exchange", RabbitMQConstant.SMS_GATEWAY_DEAD_EXCHANGE)
.withArgument("x-dead-letter-routing-key", FANOUT_ROUTING_KEY)
.build();
}
@Bean
public Binding normalBinding(Exchange normalExchange, Queue normalQueue) {
return BindingBuilder.bind(normalQueue).to(normalExchange).with("").noargs();
}
@Bean
public Exchange deadExchange() {
return ExchangeBuilder.fanoutExchange(SMS_GATEWAY_DEAD_EXCHANGE).build();
}
@Bean
public Queue deadQueue() {
return QueueBuilder.durable(SMS_GATEWAY_DEAD_QUEUE).build();
}
@Bean
public Binding deadBinding(Exchange deadExchange, Queue deadQueue) {
return BindingBuilder.bind(deadQueue).to(deadExchange).with("").noargs();
}
}

@ -1,6 +1,13 @@
package com.mashibing.smsgateway.netty4;
import com.mashibing.common.constant.SMSConstant;
import com.mashibing.common.pojo.StandardReport;
import com.mashibing.common.pojo.StandardSubmit;
import com.mashibing.common.utils.CMPP2ResultUtil;
import com.mashibing.common.utils.CMPPSubmitRepoMapUtil;
import com.mashibing.smsgateway.runnable.DeliverRunnable;
import com.mashibing.smsgateway.runnable.SubmitRepoRunnable;
import com.mashibing.smsgateway.utils.SpringUtil;
import com.mashibing.smsgateway.netty4.entity.CmppDeliver;
import com.mashibing.smsgateway.netty4.entity.CmppSubmitResp;
import com.mashibing.smsgateway.netty4.utils.MsgUtils;
@ -8,12 +15,15 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import java.util.concurrent.ThreadPoolExecutor;
/**
* handler,
*/
public class CMPPHandler extends SimpleChannelInboundHandler {
private final static Logger log = LoggerFactory.getLogger(CMPPHandler.class);
@Override
@ -22,9 +32,14 @@ public class CMPPHandler extends SimpleChannelInboundHandler {
if (msg instanceof CmppSubmitResp){
CmppSubmitResp resp=(CmppSubmitResp)msg;
log.info("-------------接收到短信提交应答-------------");
log.info("----自增id"+resp.getSequenceId());
log.info("----状态:"+ resp.getResult());
log.info("----第一次响应:"+resp.getMsgId());
log.info("----自增id{}", resp.getSequenceId());
log.info("----状态:{}", resp.getResult());
log.info("----第一次响应:{}", resp.getMsgId());
ThreadPoolExecutor cmppSubmitPool = SpringUtil.getBeanByName("cmppSubmitPool", ThreadPoolExecutor.class);
// 将任务提交给线程池处理
cmppSubmitPool.execute(new SubmitRepoRunnable(resp));
}
if (msg instanceof CmppDeliver){
@ -33,12 +48,17 @@ public class CMPPHandler extends SimpleChannelInboundHandler {
if (resp.getRegistered_Delivery() == 1) {
// 如果是状态报告的话
log.info("-------------状态报告---------------");
log.info("----第二次响应:"+resp.getMsg_Id_DELIVRD());
log.info("----手机号:"+resp.getDest_terminal_Id());
log.info("----状态:"+resp.getStat());
log.info("----第二次响应:{}", resp.getMsg_Id_DELIVRD());
log.info("----手机号:{}", resp.getDest_terminal_Id());
log.info("----状态:{}", resp.getStat());
ThreadPoolExecutor cmppDeliverPool = SpringUtil.getBeanByName("cmppDeliverPool", ThreadPoolExecutor.class);
cmppDeliverPool.execute(new DeliverRunnable(resp.getMsg_Id_DELIVRD(), resp.getStat()));
} else {
//用户回复会打印在这里
log.info(""+ MsgUtils.bytesToLong(resp.getMsg_Id()));
log.info("{}", MsgUtils.bytesToLong(resp.getMsg_Id()));
log.info(resp.getSrc_terminal_Id());
log.info(resp.getMsg_Content());
}

@ -0,0 +1,69 @@
package com.mashibing.smsgateway.runnable;
import com.mashibing.common.constant.CacheConstant;
import com.mashibing.common.constant.RabbitMQConstant;
import com.mashibing.common.constant.SMSConstant;
import com.mashibing.common.pojo.StandardReport;
import com.mashibing.common.utils.CMPP2DeliverUtil;
import com.mashibing.common.utils.CMPPDeliverMapUtil;
import com.mashibing.smsgateway.client.CacheClient;
import com.mashibing.smsgateway.utils.SpringUtil;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
/**
* @author heqijun
* @ClassName: DeliverRunnable
* @Description:
* @date 2025/6/17 20:13
*/
@Slf4j
@ToString
public class DeliverRunnable implements Runnable {
private RabbitTemplate rabbitTemplate = SpringUtil.getBeanByClass(RabbitTemplate.class);
private CacheClient cacheClient = SpringUtil.getBeanByClass(CacheClient.class);
private long msgId;
private String stat;
public DeliverRunnable(long msgId, String stat) {
this.msgId = msgId;
this.stat = stat;
}
@Override
public void run() {
log.info("【网关模块-CMPPHandler】第二次响应任务交给{}线程处理", Thread.currentThread().getName());
StandardReport report = CMPPDeliverMapUtil.remove(msgId + "");
// 根据运营商返回的stat重新封装report
if (StringUtils.isNotBlank(stat) && stat.equals("DELIVRD")) {
// 发送成功
report.setReportState(SMSConstant.REPORT_STATE_SUCCESS);
} else {
// 发送失败
report.setReportState(SMSConstant.REPORT_STATE_FAILED);
report.setErrorMsg(CMPP2DeliverUtil.getResultMessage(stat));
}
// 发送报告
Integer isCallback = cacheClient.hgetInteger(CacheConstant.CLIENT_BUSINESS + report.getApikey(), CacheConstant.IS_CALLBACK);
if (isCallback != null && isCallback == 1) {
String callbackUrl = cacheClient.hgetString(CacheConstant.CLIENT_BUSINESS + report.getApikey(), CacheConstant.CALLBACK_URL);
if (StringUtils.isNotBlank(callbackUrl)) {
report.setIsCallback(isCallback);
report.setCallbackUrl(callbackUrl);
rabbitTemplate.convertAndSend(RabbitMQConstant.SMS_PUSH_REPORT, report);
}
}
// 死信队列-确保es中已经存储第一次响应时的数据
rabbitTemplate.convertAndSend(RabbitMQConstant.SMS_GATEWAY_NORMAL_EXCHANGE, "", report);
}
}

@ -0,0 +1,58 @@
package com.mashibing.smsgateway.runnable;
import com.mashibing.common.constant.RabbitMQConstant;
import com.mashibing.common.constant.SMSConstant;
import com.mashibing.common.pojo.StandardReport;
import com.mashibing.common.pojo.StandardSubmit;
import com.mashibing.common.utils.CMPP2ResultUtil;
import com.mashibing.common.utils.CMPPDeliverMapUtil;
import com.mashibing.common.utils.CMPPSubmitRepoMapUtil;
import com.mashibing.smsgateway.netty4.entity.CmppSubmitResp;
import com.mashibing.smsgateway.utils.SpringUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeanUtils;
/**
* @author heqijun
* @ClassName: SubmitRepoRunnable
* @Description:
* @date 2025/6/17 18:58
*/
@Slf4j
public class SubmitRepoRunnable implements Runnable {
public static final int OK = 0;
private CmppSubmitResp resp;
private RabbitTemplate rabbitTemplate = SpringUtil.getBeanByClass(RabbitTemplate.class);
public SubmitRepoRunnable(CmppSubmitResp resp) {
this.resp = resp;
}
@Override
public void run() {
log.info("【网关模块-CMPPHandler】第一次响应任务交给{}线程处理", Thread.currentThread().getName());
// 根据响应结果获取submit对象
StandardReport report = null;
StandardSubmit submit = CMPPSubmitRepoMapUtil.remove(resp.getSequenceId());
// 根据响应判断短信状态
int result = resp.getResult();
if (result != OK) {
// 失败重新封装submit封装之后写入日志
submit.setReportState(SMSConstant.REPORT_STATE_FAILED);
submit.setErrorMsg(CMPP2ResultUtil.getResultMessage(result));
} else {
// 成功构建report对象放入临时存储中准备迎接第二次响应再去操作report
report = new StandardReport();
BeanUtils.copyProperties(submit, report);
CMPPDeliverMapUtil.put(resp.getMsgId() + "", report);
}
rabbitTemplate.convertAndSend(RabbitMQConstant.SMS_WRITE_LOG, submit);
}
}

@ -0,0 +1,38 @@
package com.mashibing.smsgateway.utils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
/**
* @author heqijun
* @ClassName: SpringUtil
* @Description: ioc使springbean
* @date 2025/6/17 18:36
*/
@Component
public class SpringUtil implements ApplicationContextAware {
private static ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
SpringUtil.applicationContext = applicationContext;
}
public static <T> T getBeanByName(String beanName, Class<T> clazz) {
Object bean = applicationContext.getBean(beanName);
if (clazz.isInstance(bean)) {
return clazz.cast(bean);
} else {
throw new NoSuchBeanDefinitionException("No bean with name = " + beanName + " and type = " + clazz.getSimpleName() + " found");
}
}
public static <T> T getBeanByClass(Class<T> clazz) {
return applicationContext.getBean(clazz);
}
}

@ -36,7 +36,7 @@ public class LimitHourStrategyFilter implements StrategyFilter {
private final long LIMIT_DURATION = 60 * 60 * 1000 - 1;
private final int LIMIT_COUNT = 3;
private final int LIMIT_COUNT = 100;
private static final String STRATEGY_NAME = "小时限流";

Loading…
Cancel
Save