完成搜索模块的整体修改操作

master
郑大仙丶 2 years ago
parent e5d154631d
commit 3d2bb25c7d

@ -10,6 +10,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.util.List;
@ -35,7 +36,9 @@ public class IPCheckFilter implements CheckFilter {
submit.setIp(ip);
//2. 如果IP白名单为null直接放行或者包含修改逻辑判断。
if(ip == null || ip.contains(submit.getRealIP())){
// 如果客户未设置IP白名单认为客户认为什么IP都可以访问
// 如果设置了IP白名单才需要去做一个校验
if(CollectionUtils.isEmpty(ip)|| ip.contains(submit.getRealIP())){
log.info("【接口模块-校验ip】 客户端请求IP合法");
return;
}

@ -24,7 +24,8 @@ public enum ExceptionEnums {
ONE_MINUTE_LIMIT(-16,"1分钟限流规则生效无法发送短信"),
ONE_HOUR_LIMIT(-17,"1小时限流规则生效无法发送短信"),
NO_CHANNEL(-18,"没有选择到合适的通道!"),
SEARCH_INDEX_ERROR(-19,"添加文档信息失败!")
SEARCH_INDEX_ERROR(-19,"添加文档信息失败!"),
SEARCH_UPDATE_ERROR(-20,"修改文档信息失败!")
;
private Integer code;

@ -75,4 +75,9 @@ public class StandardReport implements Serializable {
*/
private Integer resendCount = 0;
/**
* false
*/
private Boolean reUpdate = false;
}

@ -0,0 +1,46 @@
package com.mashibing.search.mq;
import com.mashibing.common.constant.RabbitMQConstants;
import com.mashibing.common.model.StandardReport;
import com.mashibing.search.service.SearchService;
import com.mashibing.search.utils.SearchUtils;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
* <p>TODO</p>
*
* @author
* @version V1.0.0
* @date 2023/6/20 22:34
*/
@Component
@Slf4j
public class SmsUpdateLogListener {
@Autowired
private SearchService searchService;
@RabbitListener(queues = {RabbitMQConstants.SMS_GATEWAY_DEAD_QUEUE})
public void consume(StandardReport report, Channel channel, Message message) throws IOException {
log.info("【搜素模块-修改日志】 接收到修改日志的消息 report = {}", report);
// 将report对象存储ThreadLocal中方便在搜索模块中获取
SearchUtils.set(report);
// 调用搜索模块完成的修改操作
Map<String,Object> doc = new HashMap<>();
doc.put("reportState",report.getReportState());
searchService.update(SearchUtils.INDEX + SearchUtils.getYear(),report.getSequenceId().toString(),doc);
// ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}

@ -4,6 +4,7 @@ import com.mashibing.common.constant.RabbitMQConstants;
import com.mashibing.common.model.StandardSubmit;
import com.mashibing.common.util.JsonUtil;
import com.mashibing.search.service.SearchService;
import com.mashibing.search.utils.SearchUtils;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
@ -26,7 +27,7 @@ public class SmsWriteLogListener {
@Autowired
private SearchService searchService;
private final String INDEX = "sms_submit_log_";
@ -34,15 +35,13 @@ public class SmsWriteLogListener {
public void consume(StandardSubmit submit, Channel channel, Message message) throws IOException {
//1、调用搜索模块的添加方法完成添加操作
log.info("接收到存储日志的信息 submit = {}",submit);
searchService.index(INDEX + getYear(),submit.getSequenceId().toString(), JsonUtil.obj2JSON(submit));
searchService.index(SearchUtils.INDEX + SearchUtils.getYear(),submit.getSequenceId().toString(), JsonUtil.obj2JSON(submit));
//2、手动ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
public String getYear(){
return LocalDateTime.now().getYear() + "";
}
}

@ -1,6 +1,7 @@
package com.mashibing.search.service;
import java.io.IOException;
import java.util.Map;
/**
* @author zjw
@ -16,4 +17,22 @@ public interface SearchService {
*/
void index(String index,String id,String json) throws IOException;
/**
*
* @param index
* @param id
* @return true false
* @throws IOException
*/
boolean exists(String index,String id) throws IOException;
/**
*
* @param index
* @param id id
* @param doc key-value
* @throws IOException
*/
void update(String index, String id, Map<String,Object> doc)throws IOException;
}

@ -1,19 +1,27 @@
package com.mashibing.search.service.impl;
import com.mashibing.common.constant.RabbitMQConstants;
import com.mashibing.common.enums.ExceptionEnums;
import com.mashibing.common.exception.SearchException;
import com.mashibing.common.model.StandardReport;
import com.mashibing.search.service.SearchService;
import com.mashibing.search.utils.SearchUtils;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.Map;
/**
* @author zjw
@ -27,6 +35,14 @@ public class ElasticsearchServiceImpl implements SearchService {
*/
private final String CREATED = "created";
/**
* result
*/
private final String UPDATED = "updated";
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private RestHighLevelClient restHighLevelClient;
@ -42,7 +58,6 @@ public class ElasticsearchServiceImpl implements SearchService {
//3、将request信息发送给ES服务
IndexResponse response = restHighLevelClient.index(request, RequestOptions.DEFAULT);
//4、校验添加是否成功
String result = response.getResult().getLowercase();
if(!CREATED.equals(result)){
@ -52,4 +67,81 @@ public class ElasticsearchServiceImpl implements SearchService {
}
log.info("【搜索模块-写入数据成功】 索引添加成功index = {},id = {},json = {},result = {}",index,id,json,result);
}
@Override
public boolean exists(String index, String id) throws IOException {
// 构建GetRequest查看索引是否存在
GetRequest request = new GetRequest();
// 指定索引信息还有文档id
request.index(index);
request.id(id);
// 基于restHighLevelClient将查询指定id的文档是否存在的请求投递过去。
boolean exists = restHighLevelClient.exists(request, RequestOptions.DEFAULT);
// 直接返回信息
return exists;
}
@Override
public void update(String index, String id, Map<String, Object> doc) throws IOException {
//1、基于exists方法查询当前文档是否存在
boolean exists = exists(index, id);
if(!exists){
// 当前文档不存在
StandardReport report = SearchUtils.get();
if(report.getReUpdate()){
// 第二次获取投递的消息到这已经是延迟20s了。
log.error("【搜索模块-修改日志】 修改日志失败report = {}",report);
}else{
// 第一次投递可以再次将消息仍会MQ中
// 开始第二次消息的投递了
report.setReUpdate(true);
rabbitTemplate.convertAndSend(RabbitMQConstants.SMS_GATEWAY_NORMAL_QUEUE,report);
}
SearchUtils.remove();
return;
}
//2、到这可以确认文档是存在的直接做修改操作
UpdateRequest request = new UpdateRequest();
request.index(index);
request.id(id);
request.doc(doc);
UpdateResponse update = restHighLevelClient.update(request, RequestOptions.DEFAULT);
String result = update.getResult().getLowercase();
if(!UPDATED.equals(result)){
// 添加失败!!
log.error("【搜索模块-修改日志失败】 index = {},id = {},doc = {}",index,id,doc);
throw new SearchException(ExceptionEnums.SEARCH_UPDATE_ERROR);
}
log.info("【搜索模块-修改日志成功】 文档修改成功index = {},id = {},doc = {}",index,id,doc);
}
}

@ -0,0 +1,46 @@
package com.mashibing.search.utils;
import com.mashibing.common.model.StandardReport;
import java.time.LocalDateTime;
/**
* <p>TODO</p>
*
* @author
* @version V1.0.0
* @date 2023/6/20 22:54
*/
public class SearchUtils {
/**
*
*/
public static final String INDEX = "sms_submit_log_";
/**
*
* @return
*/
public static String getYear(){
return LocalDateTime.now().getYear() + "";
}
// ThreadLocal操作
private static ThreadLocal<StandardReport> reportThreadLocal = new ThreadLocal<>();
public static void set(StandardReport report){
reportThreadLocal.set(report);
}
public static StandardReport get(){
return reportThreadLocal.get();
}
public static void remove(){
reportThreadLocal.remove();
}
}

@ -1,5 +1,6 @@
package com.mashibing.search.service;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@ -20,4 +21,9 @@ public class SearchServiceTest {
public void index() throws IOException {
searchService.index("sms_submit_log_2023","3","{\"clientId\": 3}");
}
@Test
public void exists() throws IOException {
System.out.println(searchService.exists("sms_submit_log_2023", "2349236478326478236478"));
}
}
Loading…
Cancel
Save