小院 3.26 17:34

master
msb_221930 3 months ago
parent d790b9c214
commit cdb77112b3

@ -10,6 +10,9 @@ import com.mashibing.common.model.StandardSubmit;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.util.List;
/**
* @author dch
@ -30,11 +33,11 @@ public class IPCheckFilter implements CheckFilter {
public void check(StandardSubmit submit) {
log.info("【接口模块-校验ip】 校验ing…………");
//1. 根据CacheClient根据客户的apikey以及ipAddress去查询客户的IP白名单
String ip = cacheClient.hgetString(CacheConstant.CLIENT_BUSINESS + submit.getApikey(), IP_ADDRESS);
List<String> ip = (List<String>) cacheClient.hget(CacheConstant.CLIENT_BUSINESS + submit.getApikey(), IP_ADDRESS);
submit.setIp(ip);
//2. 如果IP白名单为null直接放行
if(StringUtils.isEmpty(ip) || ip.contains(submit.getRealIP())){
if (CollectionUtils.isEmpty(ip) || ip.contains(submit.getRealIP())) {
log.info("【接口模块-校验ip】 客户端请求IP合法");
return;
}
@ -43,4 +46,4 @@ public class IPCheckFilter implements CheckFilter {
log.info("【接口模块-校验ip】 请求的ip不在白名单内");
throw new ApiException(ExceptionEnums.IP_NOT_WHITE);
}
}
}

@ -25,7 +25,8 @@ public enum ExceptionEnums {
ONE_MINUTE_LIMIT(-16,"1分钟限流规则生效无法发送短信"),
ONE_HOUR_LIMIT(-17,"1小时限流规则生效无法发送短信"),
NO_CHANNEL(-18,"没有选择到合适的通道"),
SEARCH_INDEX_ERROR(-19,"添加ES文档信息失败")
SEARCH_INDEX_ERROR(-19,"添加ES文档信息失败"),
SEARCH_UPDATE_ERROR(-20,"修改ES文档失败"),
;
private Integer code;

@ -84,4 +84,9 @@ public class StandardReport implements Serializable {
*/
private Integer resendCount = 0;
}
/**
* false
*/
private Boolean reUpdate = false;
}

@ -11,6 +11,7 @@ import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.time.LocalDateTime;
import java.util.List;
/**
* --POJO
@ -36,7 +37,7 @@ public class StandardSubmit implements Serializable {
/**
* ip
*/
private String ip;
private List<String> ip;
/**
* uid
@ -139,4 +140,4 @@ public class StandardSubmit implements Serializable {
*/
private Long oneHourLimitMilli;
}
}

@ -25,23 +25,23 @@ public class RabbitMQConfig {
private static final String DELAYED_ROUTING_TYPE_FANOUT = "fanout";
@Bean
public Exchange delayedExchange(){
public Exchange delayedExchange() {
Map<String, Object> args = new HashMap<>();
//延迟交换机的路由类型----fanout
args.put(DELAYED_ROUTING_TYPE_KEY,DELAYED_ROUTING_TYPE_FANOUT);
Exchange delayedExchange = new CustomExchange(DELAYED_EXCHANGE,DELAYED_EXCHANGE_TYPE,false,false,args);
args.put(DELAYED_ROUTING_TYPE_KEY, DELAYED_ROUTING_TYPE_FANOUT);
Exchange delayedExchange = new CustomExchange(DELAYED_EXCHANGE, DELAYED_EXCHANGE_TYPE, false, false, args);
return delayedExchange;
}
@Bean
public Queue delayedQueue(){
public Queue delayedQueue() {
return QueueBuilder.durable(DELAYED_QUEUE).build();
}
@Bean
public Binding delayedBinding(Exchange delayedExchange, Queue delayedQueue){
public Binding delayedBinding(Exchange delayedExchange, Queue delayedQueue) {
return BindingBuilder.bind(delayedQueue).to(delayedExchange).with("").noargs();
}
}
}

@ -130,7 +130,7 @@ public class PushReportListener {
if (!flag) {
log.info("【推送模块-推送状态报告】 第{}次推送状态报告失败report = {}", report.getResendCount() + 1, report);
report.setResendCount(report.getResendCount() + 1);
if (report.getReportState() >= 5) {
if (report.getResendCount() >= 5) {
return;
}
rabbitTemplate.convertAndSend(RabbitMQConfig.DELAYED_EXCHANGE, "", report, new MessagePostProcessor() {

@ -0,0 +1,42 @@
package com.mashibing.search.mq;
import com.mashibing.common.constant.RabbitMQConstants;
import com.mashibing.common.model.StandardReport;
import com.mashibing.search.service.SearchService;
import com.mashibing.search.utils.SearchUtils;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
* @author dch
* @create 2024-03-27 10:58
*/
@Component
@Slf4j
public class SmsUpdateLogListener {
@Autowired
private SearchService searchService;
@RabbitListener(queues = {RabbitMQConstants.SMS_GATEWAY_DEAD_QUEUE})
public void consume(StandardReport report, Channel channel, Message message) throws IOException {
log.info("【搜素模块-修改日志】 接收到修改日志的消息 report = {}", report);
// 将report对象存储ThreadLocal中方便在搜索模块中获取
SearchUtils.set(report);
// 调用搜索模块完成的修改操作
Map<String,Object> doc = new HashMap<>();
doc.put("reportState",report.getReportState());
searchService.update(SearchUtils.INDEX + SearchUtils.getYear(),report.getSequenceId().toString(),doc);
// ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}

@ -1,6 +1,7 @@
package com.mashibing.search.service;
import java.io.IOException;
import java.util.Map;
/**
* @author dch
@ -14,5 +15,17 @@ public interface SearchService {
* @param id id
* @param json
*/
void index(String index, String id, String json) throws IOException;
boolean exists(String index, String id) throws IOException;
/**
*
* @param index
* @param id id
* @param doc key-value
* @throws IOException
*/
void update(String index, String id, Map<String, Object> doc) throws IOException;
}

@ -1,19 +1,27 @@
package com.mashibing.search.service.impl;
import com.mashibing.common.constant.RabbitMQConstants;
import com.mashibing.common.enums.ExceptionEnums;
import com.mashibing.common.exception.SearchException;
import com.mashibing.common.model.StandardReport;
import com.mashibing.search.service.SearchService;
import com.mashibing.search.utils.SearchUtils;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.Map;
/**
* @author dch
@ -27,9 +35,14 @@ public class ElasticsearchServiceImpl implements SearchService {
*/
private final String CREATED = "created";
private final String UPDATED = "updated";
@Autowired
private RestHighLevelClient restHighLevelClient;
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public void index(String index, String id, String json) throws IOException {
//1、构建插入数据的Request
@ -52,4 +65,58 @@ public class ElasticsearchServiceImpl implements SearchService {
}
log.info("【搜索模块-写入数据成功】 索引添加成功index = {},id = {},json = {},result = {}",index,id,json,result);
}
}
@Override
public boolean exists(String index, String id) throws IOException {
// 构建GetRequest查看索引是否存在
GetRequest request = new GetRequest();
// 指定索引信息还有文档id
request.index(index);
request.id(id);
// 基于restHighLevelClient将查询指定id的文档是否存在的请求投递过去。
boolean exists = restHighLevelClient.exists(request, RequestOptions.DEFAULT);
// 直接返回信息
return exists;
}
@Override
public void update(String index, String id, Map<String, Object> doc) throws IOException {
//1、基于exists方法查询当前文档是否存在
boolean exists = exists(index, id);
if(!exists){
// 当前文档不存在
StandardReport report = SearchUtils.get();
if(report.getReUpdate()){
// 第二次获取投递的消息到这已经是延迟20s了。
log.error("【搜索模块-修改日志】 修改日志失败report = {}",report);
}else{
// 第一次投递可以再次将消息仍会MQ中
// 开始第二次消息的投递了
report.setReUpdate(true);
rabbitTemplate.convertAndSend(RabbitMQConstants.SMS_GATEWAY_NORMAL_QUEUE,report);
}
SearchUtils.remove();
return;
}
//2、到这可以确认文档是存在的直接做修改操作
UpdateRequest request = new UpdateRequest();
request.index(index);
request.id(id);
request.doc(doc);
UpdateResponse update = restHighLevelClient.update(request, RequestOptions.DEFAULT);
String result = update.getResult().getLowercase();
if(!UPDATED.equals(result)){
// 添加失败!!
log.error("【搜索模块-修改日志失败】 index = {},id = {},doc = {}",index,id,doc);
throw new SearchException(ExceptionEnums.SEARCH_UPDATE_ERROR);
}
log.info("【搜索模块-修改日志成功】 文档修改成功index = {},id = {},doc = {}",index,id,doc);
}
}

@ -0,0 +1,43 @@
package com.mashibing.search.utils;
import com.mashibing.common.model.StandardReport;
import org.elasticsearch.search.SearchService;
import org.springframework.beans.factory.annotation.Autowired;
import java.time.LocalDateTime;
/**
* @author dch
* @create 2024-03-27 16:26
*/
public class SearchUtils {
/**
*
*/
public final static String INDEX = "sm_submit_log_";
/**
*
* @return
*/
public static String getYear(){
return LocalDateTime.now().getYear() + "";
}
private static ThreadLocal<StandardReport> reportThreadLocal = new ThreadLocal<>();
public static void set(StandardReport report){
reportThreadLocal.set(report);
}
public static StandardReport get(){
return reportThreadLocal.get();
}
public static void remove(){
reportThreadLocal.remove();
}
}

@ -25,4 +25,9 @@ public class SearchServiceTest {
public void index() throws IOException {
searchService.index("sms_submit_log_2024","1","{\"clientId\":1}");
}
}
@Test
public void exists() throws IOException {
searchService.exists("sms_submit_log_2024","181724997830574080");
}
}

@ -1,6 +1,11 @@
package com.mashibing.test.entity;
import org.apache.commons.lang.StringUtils;
import java.util.Arrays;
import java.util.List;
public class ClientBusiness {
private long id;
@ -50,8 +55,12 @@ public class ClientBusiness {
}
public String getIpAddress() {
return ipAddress;
public List<String> getIpAddress() {
String ips = ipAddress;
if(!StringUtils.isEmpty(ips)){
return Arrays.asList(ips.split(","));
}
return null;
}
public void setIpAddress(String ipAddress) {

Loading…
Cancel
Save