From e6d3e03ac1e10e0db6ec8a2d2984cf63137e6987 Mon Sep 17 00:00:00 2001 From: Administrator Date: Fri, 21 Apr 2023 20:56:25 +0800 Subject: [PATCH] =?UTF-8?q?=E6=90=9C=E7=B4=A2=E6=A8=A1=E5=9D=97=EF=BC=9A?= =?UTF-8?q?=E5=AE=8C=E6=88=90=E5=86=99=E5=85=A5=E6=97=A5=E5=BF=97=E3=80=82?= =?UTF-8?q?=20=E6=8E=A8=E9=80=81=E6=A8=A1=E5=9D=97=EF=BC=9A=E5=AE=8C?= =?UTF-8?q?=E6=88=90=E7=8A=B6=E6=80=81=E6=8A=A5=E5=91=8A=E6=8E=A8=E9=80=81?= =?UTF-8?q?=E5=8F=8A=E9=87=8D=E8=AF=95=E6=93=8D=E4=BD=9C=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- beacon-common/pom.xml | 10 ++ .../common/enums/ExceptionEnums.java | 1 + .../common/exception/SearchException.java | 27 ++++ .../common/model/StandardReport.java | 11 ++ .../common/model/StandardSubmit.java | 6 + .../com/mashibing/common/util/JsonUtil.java | 23 +++ beacon-push/pom.xml | 43 ++++++ .../com/mashibing/push/PushStarterApp.java | 19 +++ .../mashibing/push/config/RabbitMQConfig.java | 46 ++++++ .../push/config/RestTemplateConfig.java | 19 +++ .../mashibing/push/mq/PushReportListener.java | 142 ++++++++++++++++++ beacon-push/src/main/resources/bootstrap.yml | 17 +++ beacon-search/pom.xml | 59 ++++++++ .../mashibing/search/SearchStarterApp.java | 18 +++ .../config/RestHighLevelClientConfig.java | 60 ++++++++ .../search/mq/SmsWriteLogListener.java | 46 ++++++ .../search/service/SearchService.java | 19 +++ .../impl/ElasticsearchServiceImpl.java | 55 +++++++ .../src/main/resources/bootstrap.yml | 17 +++ .../search/service/SearchServiceTest.java | 23 +++ .../com/mashibing/test/TestStarterApp.java | 9 +- pom.xml | 2 + 22 files changed, 668 insertions(+), 4 deletions(-) create mode 100644 beacon-common/src/main/java/com/mashibing/common/exception/SearchException.java create mode 100644 beacon-common/src/main/java/com/mashibing/common/util/JsonUtil.java create mode 100644 beacon-push/pom.xml create mode 100644 beacon-push/src/main/java/com/mashibing/push/PushStarterApp.java create mode 100644 beacon-push/src/main/java/com/mashibing/push/config/RabbitMQConfig.java create mode 100644 beacon-push/src/main/java/com/mashibing/push/config/RestTemplateConfig.java create mode 100644 beacon-push/src/main/java/com/mashibing/push/mq/PushReportListener.java create mode 100644 beacon-push/src/main/resources/bootstrap.yml create mode 100644 beacon-search/pom.xml create mode 100644 beacon-search/src/main/java/com/mashibing/search/SearchStarterApp.java create mode 100644 beacon-search/src/main/java/com/mashibing/search/config/RestHighLevelClientConfig.java create mode 100644 beacon-search/src/main/java/com/mashibing/search/mq/SmsWriteLogListener.java create mode 100644 beacon-search/src/main/java/com/mashibing/search/service/SearchService.java create mode 100644 beacon-search/src/main/java/com/mashibing/search/service/impl/ElasticsearchServiceImpl.java create mode 100644 beacon-search/src/main/resources/bootstrap.yml create mode 100644 beacon-search/src/test/java/com/mashibing/search/service/SearchServiceTest.java diff --git a/beacon-common/pom.xml b/beacon-common/pom.xml index 1838d8e..1a70b42 100644 --- a/beacon-common/pom.xml +++ b/beacon-common/pom.xml @@ -21,6 +21,16 @@ spring-context 5.3.12 + + com.fasterxml.jackson.core + jackson-databind + 2.12.5 + + + com.fasterxml.jackson.datatype + jackson-datatype-jsr310 + 2.12.5 + \ No newline at end of file diff --git a/beacon-common/src/main/java/com/mashibing/common/enums/ExceptionEnums.java b/beacon-common/src/main/java/com/mashibing/common/enums/ExceptionEnums.java index 23b337c..6ceda38 100644 --- a/beacon-common/src/main/java/com/mashibing/common/enums/ExceptionEnums.java +++ b/beacon-common/src/main/java/com/mashibing/common/enums/ExceptionEnums.java @@ -24,6 +24,7 @@ public enum ExceptionEnums { ONE_MINUTE_LIMIT(-16,"1分钟限流规则生效,无法发送短信"), ONE_HOUR_LIMIT(-17,"1小时限流规则生效,无法发送短信"), NO_CHANNEL(-18,"没有选择到合适的通道!"), + SEARCH_INDEX_ERROR(-19,"添加文档信息失败!") ; private Integer code; diff --git a/beacon-common/src/main/java/com/mashibing/common/exception/SearchException.java b/beacon-common/src/main/java/com/mashibing/common/exception/SearchException.java new file mode 100644 index 0000000..56c7c9d --- /dev/null +++ b/beacon-common/src/main/java/com/mashibing/common/exception/SearchException.java @@ -0,0 +1,27 @@ +package com.mashibing.common.exception; + +import com.mashibing.common.enums.ExceptionEnums; +import lombok.Getter; + +/** + * 搜索模块的异常对象 + * @author zjw + * @description + */ +@Getter +public class SearchException extends RuntimeException { + + private Integer code; + + public SearchException(String message, Integer code) { + super(message); + this.code = code; + } + + + public SearchException(ExceptionEnums enums) { + super(enums.getMsg()); + this.code = enums.getCode(); + } + +} diff --git a/beacon-common/src/main/java/com/mashibing/common/model/StandardReport.java b/beacon-common/src/main/java/com/mashibing/common/model/StandardReport.java index cefa39c..911c6c9 100644 --- a/beacon-common/src/main/java/com/mashibing/common/model/StandardReport.java +++ b/beacon-common/src/main/java/com/mashibing/common/model/StandardReport.java @@ -1,5 +1,9 @@ package com.mashibing.common.model; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer; +import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @@ -41,6 +45,8 @@ public class StandardReport implements Serializable { /** * 短信的发送时间,当前系统时间 */ + @JsonSerialize(using = LocalDateTimeSerializer.class) + @JsonDeserialize(using = LocalDateTimeDeserializer.class) private LocalDateTime sendTime; /** @@ -59,4 +65,9 @@ public class StandardReport implements Serializable { private Integer isCallback; private String callbackUrl; + /** + * 推送报告重试次数 + */ + private Integer resendCount = 0; + } diff --git a/beacon-common/src/main/java/com/mashibing/common/model/StandardSubmit.java b/beacon-common/src/main/java/com/mashibing/common/model/StandardSubmit.java index 108a8fd..c929674 100644 --- a/beacon-common/src/main/java/com/mashibing/common/model/StandardSubmit.java +++ b/beacon-common/src/main/java/com/mashibing/common/model/StandardSubmit.java @@ -1,5 +1,9 @@ package com.mashibing.common.model; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer; +import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @@ -57,6 +61,8 @@ public class StandardSubmit implements Serializable { /** * 短信的发送时间,当前系统时间 */ + @JsonSerialize(using = LocalDateTimeSerializer.class) + @JsonDeserialize(using = LocalDateTimeDeserializer.class) private LocalDateTime sendTime; diff --git a/beacon-common/src/main/java/com/mashibing/common/util/JsonUtil.java b/beacon-common/src/main/java/com/mashibing/common/util/JsonUtil.java new file mode 100644 index 0000000..f37474f --- /dev/null +++ b/beacon-common/src/main/java/com/mashibing/common/util/JsonUtil.java @@ -0,0 +1,23 @@ +package com.mashibing.common.util; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +/** + * @author zjw + * @description + */ +public class JsonUtil { + + private static ObjectMapper objectMapper = new ObjectMapper(); + + public static String obj2JSON(Object obj){ + try { + return objectMapper.writeValueAsString(obj); + } catch (JsonProcessingException e) { + e.printStackTrace(); + throw new RuntimeException("转换JSON失败!"); + } + } + +} diff --git a/beacon-push/pom.xml b/beacon-push/pom.xml new file mode 100644 index 0000000..9596328 --- /dev/null +++ b/beacon-push/pom.xml @@ -0,0 +1,43 @@ + + + + beacon-cloud + com.mashibing + 1.0-SNAPSHOT + + 4.0.0 + + beacon-push + + + + + org.springframework.boot + spring-boot-starter-web + + + + com.alibaba.cloud + spring-cloud-starter-alibaba-nacos-discovery + + + + com.alibaba.cloud + spring-cloud-starter-alibaba-nacos-config + + + + com.mashibing + beacon-common + 1.0-SNAPSHOT + + + + org.springframework.boot + spring-boot-starter-amqp + + + + \ No newline at end of file diff --git a/beacon-push/src/main/java/com/mashibing/push/PushStarterApp.java b/beacon-push/src/main/java/com/mashibing/push/PushStarterApp.java new file mode 100644 index 0000000..b59168f --- /dev/null +++ b/beacon-push/src/main/java/com/mashibing/push/PushStarterApp.java @@ -0,0 +1,19 @@ +package com.mashibing.push; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.cloud.client.discovery.EnableDiscoveryClient; + +/** + * @author zjw + * @description + */ +@SpringBootApplication +@EnableDiscoveryClient +public class PushStarterApp { + + public static void main(String[] args) { + SpringApplication.run(PushStarterApp.class,args); + } + +} diff --git a/beacon-push/src/main/java/com/mashibing/push/config/RabbitMQConfig.java b/beacon-push/src/main/java/com/mashibing/push/config/RabbitMQConfig.java new file mode 100644 index 0000000..3b3e16d --- /dev/null +++ b/beacon-push/src/main/java/com/mashibing/push/config/RabbitMQConfig.java @@ -0,0 +1,46 @@ +package com.mashibing.push.config; + +import org.springframework.amqp.core.*; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.HashMap; +import java.util.Map; + +/** + * @author zjw + * @description + */ +@Configuration +public class RabbitMQConfig { + + public static final String DELAYED_EXCHANGE = "push_delayed_exchange"; + + public static final String DELAYED_QUEUE = "push_delayed_queue"; + + private static final String DELAYED_EXCHANGE_TYPE = "x-delayed-message"; + + private static final String DELAYED_ROUTING_TYPE_KEY = "x-delayed-type"; + + private static final String DELAYED_ROUTING_TYPE_FANOUT = "fanout"; + + @Bean + public Exchange delayedExchange(){ + Map args = new HashMap<>(); + args.put(DELAYED_ROUTING_TYPE_KEY,DELAYED_ROUTING_TYPE_FANOUT); + Exchange delayedExchange = new CustomExchange(DELAYED_EXCHANGE,DELAYED_EXCHANGE_TYPE,false,false,args); + return delayedExchange; + } + + @Bean + public Queue delayedQueue(){ + return QueueBuilder.durable(DELAYED_QUEUE).build(); + } + + @Bean + public Binding delayedBinding(Exchange delayedExchange,Queue delayedQueue){ + return BindingBuilder.bind(delayedQueue).to(delayedExchange).with("").noargs(); + } + + +} diff --git a/beacon-push/src/main/java/com/mashibing/push/config/RestTemplateConfig.java b/beacon-push/src/main/java/com/mashibing/push/config/RestTemplateConfig.java new file mode 100644 index 0000000..0edee8b --- /dev/null +++ b/beacon-push/src/main/java/com/mashibing/push/config/RestTemplateConfig.java @@ -0,0 +1,19 @@ +package com.mashibing.push.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.client.RestTemplate; + +/** + * @author zjw + * @description + */ +@Configuration +public class RestTemplateConfig { + + @Bean + public RestTemplate restTemplate(){ + return new RestTemplate(); + } + +} diff --git a/beacon-push/src/main/java/com/mashibing/push/mq/PushReportListener.java b/beacon-push/src/main/java/com/mashibing/push/mq/PushReportListener.java new file mode 100644 index 0000000..aba44d1 --- /dev/null +++ b/beacon-push/src/main/java/com/mashibing/push/mq/PushReportListener.java @@ -0,0 +1,142 @@ +package com.mashibing.push.mq; + +import com.mashibing.common.constant.RabbitMQConstants; +import com.mashibing.common.model.StandardReport; +import com.mashibing.common.util.JsonUtil; +import com.mashibing.push.config.RabbitMQConfig; +import com.rabbitmq.client.Channel; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang.StringUtils; +import org.springframework.amqp.AmqpException; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.core.MessagePostProcessor; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpHeaders; +import org.springframework.http.MediaType; +import org.springframework.stereotype.Component; +import org.springframework.web.client.RestClientException; +import org.springframework.web.client.RestTemplate; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + * @author zjw + * @description + */ +@Component +@Slf4j +public class PushReportListener { + + // 重试的时间间隔。 + private int[] delayTime = {0,15000,30000,60000,300000}; + + private final String SUCCESS = "SUCCESS"; + + @Autowired + private RestTemplate restTemplate; + + @Autowired + private RabbitTemplate rabbitTemplate; + + + /** + * 监控策略模块推送过来的消息(暂时是策略) + * @param report + * @param channel + * @param message + * @throws IOException + */ + @RabbitListener(queues = RabbitMQConstants.SMS_PUSH_REPORT) + public void consume(StandardReport report, Channel channel,Message message) throws IOException { + //1、获取客户的回调地址 + String callbackUrl = report.getCallbackUrl(); + if(StringUtils.isEmpty(callbackUrl)){ + log.info("【推送模块-推送状态报告】 客户方没有设置回调的地址信息!callbackUrl = {} ",callbackUrl); + channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); + return; + } + + //2、发送状态报告 + boolean flag = pushReport(report); + + //3、如果发送失败,重试 + isResend(report, flag); + + //4、直接手动ack + channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); + } + + + /** + * 监听延迟交换机路由过来的消息 + * @param report + * @param channel + * @param message + * @throws IOException + */ + @RabbitListener(queues = RabbitMQConfig.DELAYED_QUEUE) + public void delayedConsume(StandardReport report, Channel channel,Message message) throws IOException { + // 1、发送状态报告 + boolean flag = pushReport(report); + + // 2、判断状态报告发送情况 + isResend(report, flag); + + // 手动ack + channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); + } + + + /** + * 发送请求,给callbackUrl + * @param report + * @return + */ + private boolean pushReport(StandardReport report) { + // 声明返回结果,你默认为false + boolean flag = false; + + //1、声明发送的参数 + String body = JsonUtil.obj2JSON(report); + + //2、声明RestTemplate的模板代码 + HttpHeaders httpHeaders = new HttpHeaders(); + httpHeaders.setContentType(MediaType.APPLICATION_JSON); + try { + log.info("【推送模块-推送状态报告】 第{}次推送状态报告开始!report = {}",report.getResendCount() + 1,report); + String result = restTemplate.postForObject("http://" + report.getCallbackUrl(), new HttpEntity<>(body, httpHeaders), String.class); + flag = SUCCESS.equals(result); + } catch (RestClientException e) { + } + //3、得到响应后,确认是否为SUCCESS + return flag; + } + /** + * 判断状态报告是否推送成功,失败的话需要发送重试消息 + * @param report + * @param flag + */ + private void isResend(StandardReport report, boolean flag) { + if(!flag){ + log.info("【推送模块-推送状态报告】 第{}次推送状态报告失败!report = {}",report.getResendCount() + 1,report); + report.setResendCount(report.getResendCount() + 1); + rabbitTemplate.convertAndSend(RabbitMQConfig.DELAYED_EXCHANGE, "", report, new MessagePostProcessor() { + @Override + public Message postProcessMessage(Message message) throws AmqpException { + // 设置延迟时间 + message.getMessageProperties().setDelay(delayTime[report.getResendCount()]); + return message; + } + }); + }else{ + log.info("【推送模块-推送状态报告】 第一次推送状态报告成功!report = {}",report); + } + } + + +} diff --git a/beacon-push/src/main/resources/bootstrap.yml b/beacon-push/src/main/resources/bootstrap.yml new file mode 100644 index 0000000..1ae7a7a --- /dev/null +++ b/beacon-push/src/main/resources/bootstrap.yml @@ -0,0 +1,17 @@ +# 服务名称 +spring: + application: + name: beacon-push + # 多环境 + profiles: + active: dev + # nacos注册中心地址 + cloud: + nacos: + discovery: + server-addr: 114.116.226.76:8848 + # nacos配置中心地址: + config: + server-addr: 114.116.226.76:8848 + file-extension: yml + # beacon-api-dev.yml diff --git a/beacon-search/pom.xml b/beacon-search/pom.xml new file mode 100644 index 0000000..3761ae8 --- /dev/null +++ b/beacon-search/pom.xml @@ -0,0 +1,59 @@ + + + + beacon-cloud + com.mashibing + 1.0-SNAPSHOT + + 4.0.0 + + beacon-search + + + + + org.springframework.boot + spring-boot-starter-web + + + + com.alibaba.cloud + spring-cloud-starter-alibaba-nacos-discovery + + + + com.alibaba.cloud + spring-cloud-starter-alibaba-nacos-config + + + + com.mashibing + beacon-common + 1.0-SNAPSHOT + + + + org.springframework.boot + spring-boot-starter-amqp + + + + org.elasticsearch.client + elasticsearch-rest-high-level-client + 7.6.2 + + + org.elasticsearch + elasticsearch + 7.6.2 + + + + org.springframework.boot + spring-boot-starter-test + + + + \ No newline at end of file diff --git a/beacon-search/src/main/java/com/mashibing/search/SearchStarterApp.java b/beacon-search/src/main/java/com/mashibing/search/SearchStarterApp.java new file mode 100644 index 0000000..6c04da8 --- /dev/null +++ b/beacon-search/src/main/java/com/mashibing/search/SearchStarterApp.java @@ -0,0 +1,18 @@ +package com.mashibing.search; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.cloud.client.discovery.EnableDiscoveryClient; + +/** + * @author zjw + * @description + */ +@SpringBootApplication +@EnableDiscoveryClient +public class SearchStarterApp { + + public static void main(String[] args) { + SpringApplication.run(SearchStarterApp.class,args); + } +} diff --git a/beacon-search/src/main/java/com/mashibing/search/config/RestHighLevelClientConfig.java b/beacon-search/src/main/java/com/mashibing/search/config/RestHighLevelClientConfig.java new file mode 100644 index 0000000..992c557 --- /dev/null +++ b/beacon-search/src/main/java/com/mashibing/search/config/RestHighLevelClientConfig.java @@ -0,0 +1,60 @@ +package com.mashibing.search.config; + +import com.netflix.ribbon.proxy.annotation.Http; +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; +import org.elasticsearch.client.RestHighLevelClient; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.List; + +/** + * @author zjw + * @description + */ +@Configuration +public class RestHighLevelClientConfig { + + @Value("#{'${elasticsearch.hostAndPorts}'.split(',')}") + private List hostAndPorts; + + @Value("${elasticsearch.username}") + private String username; + + + @Value("${elasticsearch.password}") + private String password; + + @Bean + public RestHighLevelClient restHighLevelClient(){ + // 初始化连接ES的HttpHost信息 + HttpHost[] httpHosts = new HttpHost[hostAndPorts.size()]; + for (int i = 0; i < hostAndPorts.size(); i++) { + String[] hostAndPort = hostAndPorts.get(i).split(":"); + httpHosts[i] = new HttpHost(hostAndPort[0],Integer.parseInt(hostAndPort[1])); + } + + // 设置认证信息 + CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials(AuthScope.ANY,new UsernamePasswordCredentials(username,password)); + + // 构建时设置连接信息,基于set设置认证信息 + RestClientBuilder restClientBuilder = RestClient.builder(httpHosts); + restClientBuilder.setHttpClientConfigCallback(f -> f.setDefaultCredentialsProvider(credentialsProvider)); + + // 构建连接ES的client对象 + RestHighLevelClient restHighLevelClient = new RestHighLevelClient(restClientBuilder); + + // 返回 + return restHighLevelClient; + } +} + diff --git a/beacon-search/src/main/java/com/mashibing/search/mq/SmsWriteLogListener.java b/beacon-search/src/main/java/com/mashibing/search/mq/SmsWriteLogListener.java new file mode 100644 index 0000000..a040205 --- /dev/null +++ b/beacon-search/src/main/java/com/mashibing/search/mq/SmsWriteLogListener.java @@ -0,0 +1,46 @@ +package com.mashibing.search.mq; + +import com.mashibing.common.constant.RabbitMQConstants; +import com.mashibing.common.model.StandardSubmit; +import com.mashibing.common.util.JsonUtil; +import com.mashibing.search.service.SearchService; +import com.rabbitmq.client.Channel; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.time.LocalDateTime; + +/** + * @author zjw + * @description + */ +@Component +@Slf4j +public class SmsWriteLogListener { + + @Autowired + private SearchService searchService; + + private final String INDEX = "sms_submit_log_"; + + + @RabbitListener(queues = RabbitMQConstants.SMS_WRITE_LOG) + public void consume(StandardSubmit submit, Channel channel, Message message) throws IOException { + //1、调用搜索模块的添加方法,完成添加操作 + log.info("接收到存储日志的信息 submit = {}",submit); + searchService.index(INDEX + getYear(),submit.getSequenceId().toString(), JsonUtil.obj2JSON(submit)); + + //2、手动ack + channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); + } + + + public String getYear(){ + return LocalDateTime.now().getYear() + ""; + } + +} diff --git a/beacon-search/src/main/java/com/mashibing/search/service/SearchService.java b/beacon-search/src/main/java/com/mashibing/search/service/SearchService.java new file mode 100644 index 0000000..bb9d9c3 --- /dev/null +++ b/beacon-search/src/main/java/com/mashibing/search/service/SearchService.java @@ -0,0 +1,19 @@ +package com.mashibing.search.service; + +import java.io.IOException; + +/** + * @author zjw + * @description + */ +public interface SearchService { + + /** + * 向es中添加一行文档 + * @param index 索引信息 + * @param id 文档id + * @param json 具体的文档内容 + */ + void index(String index,String id,String json) throws IOException; + +} diff --git a/beacon-search/src/main/java/com/mashibing/search/service/impl/ElasticsearchServiceImpl.java b/beacon-search/src/main/java/com/mashibing/search/service/impl/ElasticsearchServiceImpl.java new file mode 100644 index 0000000..bb86ab4 --- /dev/null +++ b/beacon-search/src/main/java/com/mashibing/search/service/impl/ElasticsearchServiceImpl.java @@ -0,0 +1,55 @@ +package com.mashibing.search.service.impl; + + +import com.mashibing.common.enums.ExceptionEnums; +import com.mashibing.common.exception.SearchException; +import com.mashibing.search.service.SearchService; +import lombok.extern.slf4j.Slf4j; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.common.xcontent.XContentType; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.io.IOException; + +/** + * @author zjw + * @description + */ +@Service +@Slf4j +public class ElasticsearchServiceImpl implements SearchService { + /** + * 添加成功的result + */ + private final String CREATED = "created"; + + @Autowired + private RestHighLevelClient restHighLevelClient; + + @Override + public void index(String index, String id, String json) throws IOException { + //1、构建插入数据的Request + IndexRequest request = new IndexRequest(); + + //2、给request对象封装索引信息,文档id,以及文档内容 + request.index(index); + request.id(id); + request.source(json, XContentType.JSON); + + //3、将request信息发送给ES服务 + IndexResponse response = restHighLevelClient.index(request, RequestOptions.DEFAULT); + + //4、校验添加是否成功 + String result = response.getResult().getLowercase(); + if(!CREATED.equals(result)){ + // 添加失败!! + log.error("【搜索模块-写入数据失败】 index = {},id = {},json = {},result = {}",index,id,json,result); + throw new SearchException(ExceptionEnums.SEARCH_INDEX_ERROR); + } + log.info("【搜索模块-写入数据成功】 索引添加成功index = {},id = {},json = {},result = {}",index,id,json,result); + } +} diff --git a/beacon-search/src/main/resources/bootstrap.yml b/beacon-search/src/main/resources/bootstrap.yml new file mode 100644 index 0000000..a1ad426 --- /dev/null +++ b/beacon-search/src/main/resources/bootstrap.yml @@ -0,0 +1,17 @@ +# 服务名称 +spring: + application: + name: beacon-search + # 多环境 + profiles: + active: dev + # nacos注册中心地址 + cloud: + nacos: + discovery: + server-addr: 114.116.226.76:8848 + # nacos配置中心地址: + config: + server-addr: 114.116.226.76:8848 + file-extension: yml + # beacon-search-dev.yml diff --git a/beacon-search/src/test/java/com/mashibing/search/service/SearchServiceTest.java b/beacon-search/src/test/java/com/mashibing/search/service/SearchServiceTest.java new file mode 100644 index 0000000..4cbcd77 --- /dev/null +++ b/beacon-search/src/test/java/com/mashibing/search/service/SearchServiceTest.java @@ -0,0 +1,23 @@ +package com.mashibing.search.service; + +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.junit4.SpringRunner; + +import java.io.IOException; + +import static org.junit.Assert.*; + +@SpringBootTest +@RunWith(SpringRunner.class) +public class SearchServiceTest { + + @Autowired + private SearchService searchService; + + @org.junit.Test + public void index() throws IOException { + searchService.index("sms_submit_log_2023","3","{\"clientId\": 3}"); + } +} \ No newline at end of file diff --git a/beacon-test/src/main/java/com/mashibing/test/TestStarterApp.java b/beacon-test/src/main/java/com/mashibing/test/TestStarterApp.java index 2271d6a..68642ca 100644 --- a/beacon-test/src/main/java/com/mashibing/test/TestStarterApp.java +++ b/beacon-test/src/main/java/com/mashibing/test/TestStarterApp.java @@ -6,10 +6,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.client.discovery.EnableDiscoveryClient; import org.springframework.cloud.openfeign.EnableFeignClients; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; /** * @author zjw @@ -23,6 +20,10 @@ public class TestStarterApp { public static void main(String[] args) { SpringApplication.run(TestStarterApp.class,args); + } + + + } diff --git a/pom.xml b/pom.xml index 561505a..137dea2 100644 --- a/pom.xml +++ b/pom.xml @@ -9,6 +9,8 @@ beacon-cache beacon-test beacon-strategy + beacon-search + beacon-push org.springframework.boot