搜索模块:完成写入日志。

推送模块:完成状态报告推送及重试操作。
master
Administrator 2 years ago
parent a2bbb59feb
commit e6d3e03ac1

@ -21,6 +21,16 @@
<artifactId>spring-context</artifactId>
<version>5.3.12</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.12.5</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>2.12.5</version>
</dependency>
</dependencies>
</project>

@ -24,6 +24,7 @@ public enum ExceptionEnums {
ONE_MINUTE_LIMIT(-16,"1分钟限流规则生效无法发送短信"),
ONE_HOUR_LIMIT(-17,"1小时限流规则生效无法发送短信"),
NO_CHANNEL(-18,"没有选择到合适的通道!"),
SEARCH_INDEX_ERROR(-19,"添加文档信息失败!")
;
private Integer code;

@ -0,0 +1,27 @@
package com.mashibing.common.exception;
import com.mashibing.common.enums.ExceptionEnums;
import lombok.Getter;
/**
*
* @author zjw
* @description
*/
@Getter
public class SearchException extends RuntimeException {
private Integer code;
public SearchException(String message, Integer code) {
super(message);
this.code = code;
}
public SearchException(ExceptionEnums enums) {
super(enums.getMsg());
this.code = enums.getCode();
}
}

@ -1,5 +1,9 @@
package com.mashibing.common.model;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@ -41,6 +45,8 @@ public class StandardReport implements Serializable {
/**
*
*/
@JsonSerialize(using = LocalDateTimeSerializer.class)
@JsonDeserialize(using = LocalDateTimeDeserializer.class)
private LocalDateTime sendTime;
/**
@ -59,4 +65,9 @@ public class StandardReport implements Serializable {
private Integer isCallback;
private String callbackUrl;
/**
*
*/
private Integer resendCount = 0;
}

@ -1,5 +1,9 @@
package com.mashibing.common.model;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@ -57,6 +61,8 @@ public class StandardSubmit implements Serializable {
/**
*
*/
@JsonSerialize(using = LocalDateTimeSerializer.class)
@JsonDeserialize(using = LocalDateTimeDeserializer.class)
private LocalDateTime sendTime;

@ -0,0 +1,23 @@
package com.mashibing.common.util;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
/**
* @author zjw
* @description
*/
public class JsonUtil {
private static ObjectMapper objectMapper = new ObjectMapper();
public static String obj2JSON(Object obj){
try {
return objectMapper.writeValueAsString(obj);
} catch (JsonProcessingException e) {
e.printStackTrace();
throw new RuntimeException("转换JSON失败");
}
}
}

@ -0,0 +1,43 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>beacon-cloud</artifactId>
<groupId>com.mashibing</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>beacon-push</artifactId>
<dependencies>
<!-- start-web-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- nacos-dis-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<!-- nacos-config-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
<!-- 公共组件common-->
<dependency>
<groupId>com.mashibing</groupId>
<artifactId>beacon-common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<!-- RabbitMQ依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
</project>

@ -0,0 +1,19 @@
package com.mashibing.push;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
/**
* @author zjw
* @description
*/
@SpringBootApplication
@EnableDiscoveryClient
public class PushStarterApp {
public static void main(String[] args) {
SpringApplication.run(PushStarterApp.class,args);
}
}

@ -0,0 +1,46 @@
package com.mashibing.push.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* @author zjw
* @description
*/
@Configuration
public class RabbitMQConfig {
public static final String DELAYED_EXCHANGE = "push_delayed_exchange";
public static final String DELAYED_QUEUE = "push_delayed_queue";
private static final String DELAYED_EXCHANGE_TYPE = "x-delayed-message";
private static final String DELAYED_ROUTING_TYPE_KEY = "x-delayed-type";
private static final String DELAYED_ROUTING_TYPE_FANOUT = "fanout";
@Bean
public Exchange delayedExchange(){
Map<String, Object> args = new HashMap<>();
args.put(DELAYED_ROUTING_TYPE_KEY,DELAYED_ROUTING_TYPE_FANOUT);
Exchange delayedExchange = new CustomExchange(DELAYED_EXCHANGE,DELAYED_EXCHANGE_TYPE,false,false,args);
return delayedExchange;
}
@Bean
public Queue delayedQueue(){
return QueueBuilder.durable(DELAYED_QUEUE).build();
}
@Bean
public Binding delayedBinding(Exchange delayedExchange,Queue delayedQueue){
return BindingBuilder.bind(delayedQueue).to(delayedExchange).with("").noargs();
}
}

@ -0,0 +1,19 @@
package com.mashibing.push.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.client.RestTemplate;
/**
* @author zjw
* @description
*/
@Configuration
public class RestTemplateConfig {
@Bean
public RestTemplate restTemplate(){
return new RestTemplate();
}
}

@ -0,0 +1,142 @@
package com.mashibing.push.mq;
import com.mashibing.common.constant.RabbitMQConstants;
import com.mashibing.common.model.StandardReport;
import com.mashibing.common.util.JsonUtil;
import com.mashibing.push.config.RabbitMQConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestClientException;
import org.springframework.web.client.RestTemplate;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
* @author zjw
* @description
*/
@Component
@Slf4j
public class PushReportListener {
// 重试的时间间隔。
private int[] delayTime = {0,15000,30000,60000,300000};
private final String SUCCESS = "SUCCESS";
@Autowired
private RestTemplate restTemplate;
@Autowired
private RabbitTemplate rabbitTemplate;
/**
*
* @param report
* @param channel
* @param message
* @throws IOException
*/
@RabbitListener(queues = RabbitMQConstants.SMS_PUSH_REPORT)
public void consume(StandardReport report, Channel channel,Message message) throws IOException {
//1、获取客户的回调地址
String callbackUrl = report.getCallbackUrl();
if(StringUtils.isEmpty(callbackUrl)){
log.info("【推送模块-推送状态报告】 客户方没有设置回调的地址信息callbackUrl = {} ",callbackUrl);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
return;
}
//2、发送状态报告
boolean flag = pushReport(report);
//3、如果发送失败重试
isResend(report, flag);
//4、直接手动ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
/**
*
* @param report
* @param channel
* @param message
* @throws IOException
*/
@RabbitListener(queues = RabbitMQConfig.DELAYED_QUEUE)
public void delayedConsume(StandardReport report, Channel channel,Message message) throws IOException {
// 1、发送状态报告
boolean flag = pushReport(report);
// 2、判断状态报告发送情况
isResend(report, flag);
// 手动ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
/**
* callbackUrl
* @param report
* @return
*/
private boolean pushReport(StandardReport report) {
// 声明返回结果你默认为false
boolean flag = false;
//1、声明发送的参数
String body = JsonUtil.obj2JSON(report);
//2、声明RestTemplate的模板代码
HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.setContentType(MediaType.APPLICATION_JSON);
try {
log.info("【推送模块-推送状态报告】 第{}次推送状态报告开始report = {}",report.getResendCount() + 1,report);
String result = restTemplate.postForObject("http://" + report.getCallbackUrl(), new HttpEntity<>(body, httpHeaders), String.class);
flag = SUCCESS.equals(result);
} catch (RestClientException e) {
}
//3、得到响应后确认是否为SUCCESS
return flag;
}
/**
*
* @param report
* @param flag
*/
private void isResend(StandardReport report, boolean flag) {
if(!flag){
log.info("【推送模块-推送状态报告】 第{}次推送状态报告失败report = {}",report.getResendCount() + 1,report);
report.setResendCount(report.getResendCount() + 1);
rabbitTemplate.convertAndSend(RabbitMQConfig.DELAYED_EXCHANGE, "", report, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 设置延迟时间
message.getMessageProperties().setDelay(delayTime[report.getResendCount()]);
return message;
}
});
}else{
log.info("【推送模块-推送状态报告】 第一次推送状态报告成功report = {}",report);
}
}
}

@ -0,0 +1,17 @@
# 服务名称
spring:
application:
name: beacon-push
# 多环境
profiles:
active: dev
# nacos注册中心地址
cloud:
nacos:
discovery:
server-addr: 114.116.226.76:8848
# nacos配置中心地址:
config:
server-addr: 114.116.226.76:8848
file-extension: yml
# beacon-api-dev.yml

@ -0,0 +1,59 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>beacon-cloud</artifactId>
<groupId>com.mashibing</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>beacon-search</artifactId>
<dependencies>
<!-- start-web-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- nacos-dis-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<!-- nacos-config-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
<!-- 公共组件common-->
<dependency>
<groupId>com.mashibing</groupId>
<artifactId>beacon-common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<!-- RabbitMQ依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- elasticsearch的客户端依赖-->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.6.2</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.6.2</version>
</dependency>
<!-- 测试组件-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
</project>

@ -0,0 +1,18 @@
package com.mashibing.search;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
/**
* @author zjw
* @description
*/
@SpringBootApplication
@EnableDiscoveryClient
public class SearchStarterApp {
public static void main(String[] args) {
SpringApplication.run(SearchStarterApp.class,args);
}
}

@ -0,0 +1,60 @@
package com.mashibing.search.config;
import com.netflix.ribbon.proxy.annotation.Http;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.List;
/**
* @author zjw
* @description
*/
@Configuration
public class RestHighLevelClientConfig {
@Value("#{'${elasticsearch.hostAndPorts}'.split(',')}")
private List<String> hostAndPorts;
@Value("${elasticsearch.username}")
private String username;
@Value("${elasticsearch.password}")
private String password;
@Bean
public RestHighLevelClient restHighLevelClient(){
// 初始化连接ES的HttpHost信息
HttpHost[] httpHosts = new HttpHost[hostAndPorts.size()];
for (int i = 0; i < hostAndPorts.size(); i++) {
String[] hostAndPort = hostAndPorts.get(i).split(":");
httpHosts[i] = new HttpHost(hostAndPort[0],Integer.parseInt(hostAndPort[1]));
}
// 设置认证信息
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,new UsernamePasswordCredentials(username,password));
// 构建时设置连接信息基于set设置认证信息
RestClientBuilder restClientBuilder = RestClient.builder(httpHosts);
restClientBuilder.setHttpClientConfigCallback(f -> f.setDefaultCredentialsProvider(credentialsProvider));
// 构建连接ES的client对象
RestHighLevelClient restHighLevelClient = new RestHighLevelClient(restClientBuilder);
// 返回
return restHighLevelClient;
}
}

@ -0,0 +1,46 @@
package com.mashibing.search.mq;
import com.mashibing.common.constant.RabbitMQConstants;
import com.mashibing.common.model.StandardSubmit;
import com.mashibing.common.util.JsonUtil;
import com.mashibing.search.service.SearchService;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.time.LocalDateTime;
/**
* @author zjw
* @description
*/
@Component
@Slf4j
public class SmsWriteLogListener {
@Autowired
private SearchService searchService;
private final String INDEX = "sms_submit_log_";
@RabbitListener(queues = RabbitMQConstants.SMS_WRITE_LOG)
public void consume(StandardSubmit submit, Channel channel, Message message) throws IOException {
//1、调用搜索模块的添加方法完成添加操作
log.info("接收到存储日志的信息 submit = {}",submit);
searchService.index(INDEX + getYear(),submit.getSequenceId().toString(), JsonUtil.obj2JSON(submit));
//2、手动ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
public String getYear(){
return LocalDateTime.now().getYear() + "";
}
}

@ -0,0 +1,19 @@
package com.mashibing.search.service;
import java.io.IOException;
/**
* @author zjw
* @description
*/
public interface SearchService {
/**
* es
* @param index
* @param id id
* @param json
*/
void index(String index,String id,String json) throws IOException;
}

@ -0,0 +1,55 @@
package com.mashibing.search.service.impl;
import com.mashibing.common.enums.ExceptionEnums;
import com.mashibing.common.exception.SearchException;
import com.mashibing.search.service.SearchService;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.IOException;
/**
* @author zjw
* @description
*/
@Service
@Slf4j
public class ElasticsearchServiceImpl implements SearchService {
/**
* result
*/
private final String CREATED = "created";
@Autowired
private RestHighLevelClient restHighLevelClient;
@Override
public void index(String index, String id, String json) throws IOException {
//1、构建插入数据的Request
IndexRequest request = new IndexRequest();
//2、给request对象封装索引信息文档id以及文档内容
request.index(index);
request.id(id);
request.source(json, XContentType.JSON);
//3、将request信息发送给ES服务
IndexResponse response = restHighLevelClient.index(request, RequestOptions.DEFAULT);
//4、校验添加是否成功
String result = response.getResult().getLowercase();
if(!CREATED.equals(result)){
// 添加失败!!
log.error("【搜索模块-写入数据失败】 index = {},id = {},json = {},result = {}",index,id,json,result);
throw new SearchException(ExceptionEnums.SEARCH_INDEX_ERROR);
}
log.info("【搜索模块-写入数据成功】 索引添加成功index = {},id = {},json = {},result = {}",index,id,json,result);
}
}

@ -0,0 +1,17 @@
# 服务名称
spring:
application:
name: beacon-search
# 多环境
profiles:
active: dev
# nacos注册中心地址
cloud:
nacos:
discovery:
server-addr: 114.116.226.76:8848
# nacos配置中心地址:
config:
server-addr: 114.116.226.76:8848
file-extension: yml
# beacon-search-dev.yml

@ -0,0 +1,23 @@
package com.mashibing.search.service;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.io.IOException;
import static org.junit.Assert.*;
@SpringBootTest
@RunWith(SpringRunner.class)
public class SearchServiceTest {
@Autowired
private SearchService searchService;
@org.junit.Test
public void index() throws IOException {
searchService.index("sms_submit_log_2023","3","{\"clientId\": 3}");
}
}

@ -6,10 +6,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.openfeign.EnableFeignClients;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
/**
* @author zjw
@ -23,6 +20,10 @@ public class TestStarterApp {
public static void main(String[] args) {
SpringApplication.run(TestStarterApp.class,args);
}
}

@ -9,6 +9,8 @@
<module>beacon-cache</module>
<module>beacon-test</module>
<module>beacon-strategy</module>
<module>beacon-search</module>
<module>beacon-push</module>
</modules>
<parent>
<groupId>org.springframework.boot</groupId>

Loading…
Cancel
Save