diff --git a/beacon-api/src/main/java/com/mashibing/api/filter/impl/IPCheckFilter.java b/beacon-api/src/main/java/com/mashibing/api/filter/impl/IPCheckFilter.java index 9ccefb4..0dded0e 100644 --- a/beacon-api/src/main/java/com/mashibing/api/filter/impl/IPCheckFilter.java +++ b/beacon-api/src/main/java/com/mashibing/api/filter/impl/IPCheckFilter.java @@ -10,6 +10,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import org.springframework.util.CollectionUtils; import java.util.List; @@ -35,7 +36,9 @@ public class IPCheckFilter implements CheckFilter { submit.setIp(ip); //2. 如果IP白名单为null,直接放行,或者包含,修改逻辑判断。 - if(ip == null || ip.contains(submit.getRealIP())){ + // 如果客户未设置IP白名单,认为客户认为什么IP都可以访问 + // 如果设置了IP白名单,才需要去做一个校验 + if(CollectionUtils.isEmpty(ip)|| ip.contains(submit.getRealIP())){ log.info("【接口模块-校验ip】 客户端请求IP合法!"); return; } diff --git a/beacon-common/src/main/java/com/mashibing/common/enums/ExceptionEnums.java b/beacon-common/src/main/java/com/mashibing/common/enums/ExceptionEnums.java index 6ceda38..ff1e8c2 100644 --- a/beacon-common/src/main/java/com/mashibing/common/enums/ExceptionEnums.java +++ b/beacon-common/src/main/java/com/mashibing/common/enums/ExceptionEnums.java @@ -24,7 +24,8 @@ public enum ExceptionEnums { ONE_MINUTE_LIMIT(-16,"1分钟限流规则生效,无法发送短信"), ONE_HOUR_LIMIT(-17,"1小时限流规则生效,无法发送短信"), NO_CHANNEL(-18,"没有选择到合适的通道!"), - SEARCH_INDEX_ERROR(-19,"添加文档信息失败!") + SEARCH_INDEX_ERROR(-19,"添加文档信息失败!"), + SEARCH_UPDATE_ERROR(-20,"修改文档信息失败!") ; private Integer code; diff --git a/beacon-common/src/main/java/com/mashibing/common/model/StandardReport.java b/beacon-common/src/main/java/com/mashibing/common/model/StandardReport.java index 2394682..3af8aa9 100644 --- a/beacon-common/src/main/java/com/mashibing/common/model/StandardReport.java +++ b/beacon-common/src/main/java/com/mashibing/common/model/StandardReport.java @@ -75,4 +75,9 @@ public class StandardReport implements Serializable { */ private Integer resendCount = 0; + /** + * 如果第一次修改操作,这里为false,如果是第二次投递,需要直接记录日志信息 + */ + private Boolean reUpdate = false; + } diff --git a/beacon-search/src/main/java/com/mashibing/search/mq/SmsUpdateLogListener.java b/beacon-search/src/main/java/com/mashibing/search/mq/SmsUpdateLogListener.java new file mode 100644 index 0000000..1982c7f --- /dev/null +++ b/beacon-search/src/main/java/com/mashibing/search/mq/SmsUpdateLogListener.java @@ -0,0 +1,46 @@ +package com.mashibing.search.mq; + +import com.mashibing.common.constant.RabbitMQConstants; +import com.mashibing.common.model.StandardReport; +import com.mashibing.search.service.SearchService; +import com.mashibing.search.utils.SearchUtils; +import com.rabbitmq.client.Channel; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + *

TODO

+ * + * @author 郑大仙丶 + * @version V1.0.0 + * @date 2023/6/20 22:34 + */ +@Component +@Slf4j +public class SmsUpdateLogListener { + + + @Autowired + private SearchService searchService; + + @RabbitListener(queues = {RabbitMQConstants.SMS_GATEWAY_DEAD_QUEUE}) + public void consume(StandardReport report, Channel channel, Message message) throws IOException { + log.info("【搜素模块-修改日志】 接收到修改日志的消息 report = {}", report); + // 将report对象存储ThreadLocal中,方便在搜索模块中获取 + SearchUtils.set(report); + // 调用搜索模块完成的修改操作 + Map doc = new HashMap<>(); + doc.put("reportState",report.getReportState()); + searchService.update(SearchUtils.INDEX + SearchUtils.getYear(),report.getSequenceId().toString(),doc); + + // ack + channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); + } +} diff --git a/beacon-search/src/main/java/com/mashibing/search/mq/SmsWriteLogListener.java b/beacon-search/src/main/java/com/mashibing/search/mq/SmsWriteLogListener.java index 3af2057..24a26b0 100644 --- a/beacon-search/src/main/java/com/mashibing/search/mq/SmsWriteLogListener.java +++ b/beacon-search/src/main/java/com/mashibing/search/mq/SmsWriteLogListener.java @@ -4,6 +4,7 @@ import com.mashibing.common.constant.RabbitMQConstants; import com.mashibing.common.model.StandardSubmit; import com.mashibing.common.util.JsonUtil; import com.mashibing.search.service.SearchService; +import com.mashibing.search.utils.SearchUtils; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; @@ -26,7 +27,7 @@ public class SmsWriteLogListener { @Autowired private SearchService searchService; - private final String INDEX = "sms_submit_log_"; + @@ -34,15 +35,13 @@ public class SmsWriteLogListener { public void consume(StandardSubmit submit, Channel channel, Message message) throws IOException { //1、调用搜索模块的添加方法,完成添加操作 log.info("接收到存储日志的信息 submit = {}",submit); - searchService.index(INDEX + getYear(),submit.getSequenceId().toString(), JsonUtil.obj2JSON(submit)); + searchService.index(SearchUtils.INDEX + SearchUtils.getYear(),submit.getSequenceId().toString(), JsonUtil.obj2JSON(submit)); //2、手动ack channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } - public String getYear(){ - return LocalDateTime.now().getYear() + ""; - } + } diff --git a/beacon-search/src/main/java/com/mashibing/search/service/SearchService.java b/beacon-search/src/main/java/com/mashibing/search/service/SearchService.java index bb9d9c3..95c5e9f 100644 --- a/beacon-search/src/main/java/com/mashibing/search/service/SearchService.java +++ b/beacon-search/src/main/java/com/mashibing/search/service/SearchService.java @@ -1,6 +1,7 @@ package com.mashibing.search.service; import java.io.IOException; +import java.util.Map; /** * @author zjw @@ -16,4 +17,22 @@ public interface SearchService { */ void index(String index,String id,String json) throws IOException; + /** + * 查看指定索引中的文档是否存在 + * @param index + * @param id + * @return true:代表存在 false:代表不存在 + * @throws IOException + */ + boolean exists(String index,String id) throws IOException; + + /** + * 修改文档信息 + * @param index 指定索引 + * @param id 文档id + * @param doc 要修改的key-value集合 + * @throws IOException + */ + void update(String index, String id, Map doc)throws IOException; + } diff --git a/beacon-search/src/main/java/com/mashibing/search/service/impl/ElasticsearchServiceImpl.java b/beacon-search/src/main/java/com/mashibing/search/service/impl/ElasticsearchServiceImpl.java index bb86ab4..767a2b7 100644 --- a/beacon-search/src/main/java/com/mashibing/search/service/impl/ElasticsearchServiceImpl.java +++ b/beacon-search/src/main/java/com/mashibing/search/service/impl/ElasticsearchServiceImpl.java @@ -1,19 +1,27 @@ package com.mashibing.search.service.impl; +import com.mashibing.common.constant.RabbitMQConstants; import com.mashibing.common.enums.ExceptionEnums; import com.mashibing.common.exception.SearchException; +import com.mashibing.common.model.StandardReport; import com.mashibing.search.service.SearchService; +import com.mashibing.search.utils.SearchUtils; import lombok.extern.slf4j.Slf4j; +import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.xcontent.XContentType; +import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.io.IOException; +import java.util.Map; /** * @author zjw @@ -27,6 +35,14 @@ public class ElasticsearchServiceImpl implements SearchService { */ private final String CREATED = "created"; + /** + * 修改成功的result + */ + private final String UPDATED = "updated"; + + @Autowired + private RabbitTemplate rabbitTemplate; + @Autowired private RestHighLevelClient restHighLevelClient; @@ -42,7 +58,6 @@ public class ElasticsearchServiceImpl implements SearchService { //3、将request信息发送给ES服务 IndexResponse response = restHighLevelClient.index(request, RequestOptions.DEFAULT); - //4、校验添加是否成功 String result = response.getResult().getLowercase(); if(!CREATED.equals(result)){ @@ -52,4 +67,81 @@ public class ElasticsearchServiceImpl implements SearchService { } log.info("【搜索模块-写入数据成功】 索引添加成功index = {},id = {},json = {},result = {}",index,id,json,result); } + + @Override + public boolean exists(String index, String id) throws IOException { + // 构建GetRequest,查看索引是否存在 + GetRequest request = new GetRequest(); + + // 指定索引信息,还有文档id + request.index(index); + request.id(id); + + // 基于restHighLevelClient将查询指定id的文档是否存在的请求投递过去。 + boolean exists = restHighLevelClient.exists(request, RequestOptions.DEFAULT); + + // 直接返回信息 + return exists; + } + + @Override + public void update(String index, String id, Map doc) throws IOException { + //1、基于exists方法,查询当前文档是否存在 + boolean exists = exists(index, id); + if(!exists){ + // 当前文档不存在 + StandardReport report = SearchUtils.get(); + if(report.getReUpdate()){ + // 第二次获取投递的消息,到这已经是延迟20s了。 + log.error("【搜索模块-修改日志】 修改日志失败,report = {}",report); + }else{ + // 第一次投递,可以再次将消息仍会MQ中 + // 开始第二次消息的投递了 + report.setReUpdate(true); + rabbitTemplate.convertAndSend(RabbitMQConstants.SMS_GATEWAY_NORMAL_QUEUE,report); + } + SearchUtils.remove(); + return; + } + //2、到这,可以确认文档是存在的,直接做修改操作 + UpdateRequest request = new UpdateRequest(); + + request.index(index); + request.id(id); + request.doc(doc); + + UpdateResponse update = restHighLevelClient.update(request, RequestOptions.DEFAULT); + String result = update.getResult().getLowercase(); + if(!UPDATED.equals(result)){ + // 添加失败!! + log.error("【搜索模块-修改日志失败】 index = {},id = {},doc = {}",index,id,doc); + throw new SearchException(ExceptionEnums.SEARCH_UPDATE_ERROR); + } + log.info("【搜索模块-修改日志成功】 文档修改成功index = {},id = {},doc = {}",index,id,doc); + + } + + + + + + + + + + + + + + + + + + + + + + + + } diff --git a/beacon-search/src/main/java/com/mashibing/search/utils/SearchUtils.java b/beacon-search/src/main/java/com/mashibing/search/utils/SearchUtils.java new file mode 100644 index 0000000..6694a00 --- /dev/null +++ b/beacon-search/src/main/java/com/mashibing/search/utils/SearchUtils.java @@ -0,0 +1,46 @@ +package com.mashibing.search.utils; + +import com.mashibing.common.model.StandardReport; + +import java.time.LocalDateTime; + +/** + *

TODO

+ * + * @author 郑大仙丶 + * @version V1.0.0 + * @date 2023/6/20 22:54 + */ +public class SearchUtils { + + /** + * 索引前缀 + */ + public static final String INDEX = "sms_submit_log_"; + + /** + * 获取年份信息 + * @return + */ + public static String getYear(){ + return LocalDateTime.now().getYear() + ""; + } + + + // ThreadLocal操作 + private static ThreadLocal reportThreadLocal = new ThreadLocal<>(); + + + public static void set(StandardReport report){ + reportThreadLocal.set(report); + } + + public static StandardReport get(){ + return reportThreadLocal.get(); + } + + public static void remove(){ + reportThreadLocal.remove(); + } + +} diff --git a/beacon-search/src/test/java/com/mashibing/search/service/SearchServiceTest.java b/beacon-search/src/test/java/com/mashibing/search/service/SearchServiceTest.java index 4cbcd77..de284f7 100644 --- a/beacon-search/src/test/java/com/mashibing/search/service/SearchServiceTest.java +++ b/beacon-search/src/test/java/com/mashibing/search/service/SearchServiceTest.java @@ -1,5 +1,6 @@ package com.mashibing.search.service; +import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @@ -20,4 +21,9 @@ public class SearchServiceTest { public void index() throws IOException { searchService.index("sms_submit_log_2023","3","{\"clientId\": 3}"); } + + @Test + public void exists() throws IOException { + System.out.println(searchService.exists("sms_submit_log_2023", "2349236478326478236478")); + } } \ No newline at end of file