Compare commits

..

5 Commits

@ -24,6 +24,16 @@
<groupId>org.springframework.cloud</groupId> <groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId> <artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency> </dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.12.5</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>2.12.5</version>
</dependency>
</dependencies> </dependencies>
</project> </project>

@ -27,8 +27,7 @@ public enum ExceptionEnums {
LIMIT_MINUTE(-16, "达到分钟限流阈值!!"), LIMIT_MINUTE(-16, "达到分钟限流阈值!!"),
LIMIT_HOUR(-17, "达到小时限流阈值!!"), LIMIT_HOUR(-17, "达到小时限流阈值!!"),
NO_CHANNEL(-18, "没有可用通道!!!"), NO_CHANNEL(-18, "没有可用通道!!!"),
SEARCH_INDEX_ERROR(-20, "es添加一行文档失败");
;
private final int code; private final int code;

@ -0,0 +1,30 @@
package com.mashibing.common.exception;
import com.mashibing.common.enums.ExceptionEnums;
import lombok.Getter;
/**
* @author heqijun
* @ClassName: SearchException
* @Description:
* @date 2025/6/12 18:13
*/
@Getter
public class SearchException extends RuntimeException {
private static final long serialVersionUID = 1L;
private final Integer code;
public SearchException(Integer code, String message) {
super(message);
this.code = code;
}
public SearchException(ExceptionEnums exceptionEnums) {
super(exceptionEnums.getMsg());
this.code = exceptionEnums.getCode();
}
}

@ -1,5 +1,9 @@
package com.mashibing.common.pojo; package com.mashibing.common.pojo;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;
import com.mashibing.common.annotation.Description; import com.mashibing.common.annotation.Description;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Data; import lombok.Data;
@ -44,6 +48,8 @@ public class StandardSubmit implements Serializable {
@Description("短信内容") @Description("短信内容")
private String text; private String text;
@JsonSerialize(using = LocalDateTimeSerializer.class)
@JsonDeserialize(using = LocalDateTimeDeserializer.class)
@Description("短信的发送时间") @Description("短信的发送时间")
private LocalDateTime sendTime; private LocalDateTime sendTime;

@ -0,0 +1,24 @@
package com.mashibing.common.utils;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
/**
* @author heqijun
* @ClassName: JsonUtil
* @Description: TODO()
* @date 2025/6/12 18:46
*/
public class JsonUtil {
private static ObjectMapper objectMapper = new ObjectMapper();
public static String obj2Json(Object obj) {
try {
return objectMapper.writeValueAsString(obj);
} catch (JsonProcessingException e) {
throw new RuntimeException("转换JSON失败");
}
}
}

@ -0,0 +1,58 @@
package com.mashibing.search.config;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author heqijun
* @ClassName: RestHighLevelClient
* @Description: elasticsearch
* @date 2025/6/12 17:09
*/
@Configuration
public class RestHighLevelClientConfig {
@Value("${elasticsearch.hostAndPorts}")
private String[] hostAndPorts;
@Value("${elasticsearch.username:elastic}")
private String username;
@Value("${elasticsearch.password}")
private String password;
@Bean
public RestHighLevelClient restHighLevelClient() {
HttpHost[] httpHost = new HttpHost[hostAndPorts.length];
for (int i = 0; i < httpHost.length; i++) {
String[] hostAndPort = hostAndPorts[i].split(":");
httpHost[i] = new HttpHost(hostAndPort[0], Integer.parseInt(hostAndPort[1]));
}
// 设置认证信息
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
// 设置连接信息
RestClientBuilder restClientBuilder = RestClient.builder(httpHost);
restClientBuilder.setHttpClientConfigCallback(f -> f.setDefaultCredentialsProvider(credentialsProvider));
// 使用RestClientBuilder构建RestHighLevelClient对象
RestHighLevelClient restHighLevelClient = new RestHighLevelClient(restClientBuilder);
return restHighLevelClient;
}
}

@ -0,0 +1,47 @@
package com.mashibing.search.mq;
import com.mashibing.common.constant.RabbitMQConstant;
import com.mashibing.common.pojo.StandardSubmit;
import com.mashibing.common.utils.JsonUtil;
import com.mashibing.search.service.SearchService;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.time.LocalDateTime;
/**
* @author heqijun
* @ClassName: SmsWriteLogListener
* @Description:
* @date 2025/6/12 17:31
*/
@Slf4j
@Component
public class SmsWriteLogListener {
@Autowired
SearchService searchService;
private final String INDEX = "sms_submit_log_";
@RabbitListener(queues = {RabbitMQConstant.SMS_WRITE_LOG})
public void process(StandardSubmit submit, Channel channel, Message message) throws IOException {
log.info("接收到存储日志的信息submit={}", submit);
String sequenceId = submit.getSequenceId().toString();
String json = JsonUtil.obj2Json(submit);
String year = LocalDateTime.now().getYear() + "";
searchService.index(INDEX + year, sequenceId, json);
//手动ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}

@ -0,0 +1,22 @@
package com.mashibing.search.service;
import java.io.IOException;
/**
* @author heqijun
* @ClassName: SearchService
* @Description: service
* @date 2025/6/12 18:00
*/
public interface SearchService {
/**
* es
*
* @param index
* @param id id
* @param json
*/
void index(String index, String id, String json) throws IOException;
}

@ -0,0 +1,52 @@
package com.mashibing.search.service.impl;
import com.mashibing.common.enums.ExceptionEnums;
import com.mashibing.common.exception.SearchException;
import com.mashibing.search.service.SearchService;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.IOException;
/**
* @author heqijun
* @ClassName: ElasticSearchServiceImpl
* @Description: service
* @date 2025/6/12 18:03
*/
@Slf4j
@Service
public class ElasticSearchServiceImpl implements SearchService {
private static String CREATED = "created";
@Autowired
RestHighLevelClient client;
@Override
public void index(String index, String id, String json) throws IOException {
//构建request
IndexRequest request = new IndexRequest();
request.index(index).id(id).source(json, XContentType.JSON);
//发送插入请求
IndexResponse response = client.index(request, RequestOptions.DEFAULT);
String result = response.getResult().getLowercase();
if (!CREATED.equals(result)) {
//插入失败
log.error("【搜索模块-写入数据】失败index = {},id = {},json = {},result = {}", index, id, json, result);
throw new SearchException(ExceptionEnums.SEARCH_INDEX_ERROR);
}
log.info("【搜索模块-写入数据】成功index = {},id = {},json = {},result = {}", index, id, json, result);
}
}

@ -0,0 +1,28 @@
package com.mashibing.search.service.impl;
import com.mashibing.search.service.SearchService;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.io.IOException;
import static org.junit.jupiter.api.Assertions.*;
@SpringBootTest
@RunWith(SpringRunner.class)
class ElasticSearchServiceImplTest {
@Autowired
private SearchService service;
@Test
void index() throws IOException {
String index = "sms_submit_log_2023";
String id = "1";
String json = "{\"clientId\": 1}";
service.index(index, id, json);
}
}

@ -40,6 +40,7 @@ public class BlackClientStrategyFilter implements StrategyFilter {
String value = cacheClient.get(CacheConstant.BLACK + clientId + CacheConstant.COLON + mobile); String value = cacheClient.get(CacheConstant.BLACK + clientId + CacheConstant.COLON + mobile);
if (IN_BLACK.equals(value)) { if (IN_BLACK.equals(value)) {
log.info("【策略模块-客户黑名单校验】当前手机号:{}是客户黑名单!", mobile); log.info("【策略模块-客户黑名单校验】当前手机号:{}是客户黑名单!", mobile);
submit.setErrorMsg(ExceptionEnums.BLACK_CLIENT.getMsg());
strategyCheckFailedUtil.smsSendLog(submit, STRATEGY_NAME); strategyCheckFailedUtil.smsSendLog(submit, STRATEGY_NAME);
strategyCheckFailedUtil.smsPushReport(submit, STRATEGY_NAME); strategyCheckFailedUtil.smsPushReport(submit, STRATEGY_NAME);
throw new StrategyException(ExceptionEnums.BLACK_CLIENT); throw new StrategyException(ExceptionEnums.BLACK_CLIENT);

@ -40,6 +40,7 @@ public class BlackGlobalStrategyFilter implements StrategyFilter {
String value = cacheClient.get(CacheConstant.BLACK + mobile); String value = cacheClient.get(CacheConstant.BLACK + mobile);
if (IN_BLACK.equals(value)) { if (IN_BLACK.equals(value)) {
log.info("【策略模块-全局黑名单校验】当前手机号:{}是全局黑名单!", mobile); log.info("【策略模块-全局黑名单校验】当前手机号:{}是全局黑名单!", mobile);
submit.setErrorMsg(ExceptionEnums.BLACK_GLOBAL.getMsg());
strategyCheckFailedUtil.smsSendLog(submit, STRATEGY_NAME); strategyCheckFailedUtil.smsSendLog(submit, STRATEGY_NAME);
strategyCheckFailedUtil.smsPushReport(submit, STRATEGY_NAME); strategyCheckFailedUtil.smsPushReport(submit, STRATEGY_NAME);
throw new StrategyException(ExceptionEnums.BLACK_GLOBAL); throw new StrategyException(ExceptionEnums.BLACK_GLOBAL);

@ -48,6 +48,7 @@ public class FeeStrategyFilter implements StrategyFilter {
log.info("【策略模块-扣费校验】超出欠费额度!!!"); log.info("【策略模块-扣费校验】超出欠费额度!!!");
//加回去 //加回去
cacheClient.hincrby(key, filed, fee); cacheClient.hincrby(key, filed, fee);
submit.setErrorMsg(ExceptionEnums.BALANCE_NOT_ENOUGH.getMsg());
strategyCheckFailedUtil.smsSendLog(submit, STRATEGY_NAME); strategyCheckFailedUtil.smsSendLog(submit, STRATEGY_NAME);
strategyCheckFailedUtil.smsPushReport(submit, STRATEGY_NAME); strategyCheckFailedUtil.smsPushReport(submit, STRATEGY_NAME);
throw new StrategyException(ExceptionEnums.BALANCE_NOT_ENOUGH); throw new StrategyException(ExceptionEnums.BALANCE_NOT_ENOUGH);

@ -67,6 +67,7 @@ public class LimitHourStrategyFilter implements StrategyFilter {
//三次插入都失败,直接报错 //三次插入都失败,直接报错
if (retryCount == RETRY_MAX) { if (retryCount == RETRY_MAX) {
log.error("【策略模块-小时限流校验】达到限流阈值,校验失败!!!"); log.error("【策略模块-小时限流校验】达到限流阈值,校验失败!!!");
submit.setErrorMsg(ExceptionEnums.LIMIT_HOUR.getMsg());
strategyCheckFailedUtil.smsSendLog(submit, STRATEGY_NAME); strategyCheckFailedUtil.smsSendLog(submit, STRATEGY_NAME);
strategyCheckFailedUtil.smsPushReport(submit, STRATEGY_NAME); strategyCheckFailedUtil.smsPushReport(submit, STRATEGY_NAME);
throw new StrategyException(ExceptionEnums.LIMIT_HOUR); throw new StrategyException(ExceptionEnums.LIMIT_HOUR);

@ -54,6 +54,7 @@ public class LimitMinuteStrategyFilter implements StrategyFilter {
} else { } else {
//插入失败,报错 //插入失败,报错
log.error("【策略模块-分钟限流校验】达到限流阈值,校验失败!!!"); log.error("【策略模块-分钟限流校验】达到限流阈值,校验失败!!!");
submit.setErrorMsg(ExceptionEnums.LIMIT_MINUTE.getMsg());
strategyCheckFailedUtil.smsSendLog(submit, STRATEGY_NAME); strategyCheckFailedUtil.smsSendLog(submit, STRATEGY_NAME);
strategyCheckFailedUtil.smsPushReport(submit, STRATEGY_NAME); strategyCheckFailedUtil.smsPushReport(submit, STRATEGY_NAME);
throw new StrategyException(ExceptionEnums.LIMIT_MINUTE); throw new StrategyException(ExceptionEnums.LIMIT_MINUTE);

Loading…
Cancel
Save