限流操作、扣费操作、路由策略(数据同步)

master
Administrator 2 years ago
parent 29ee4f7ad6
commit b51e28ec74

@ -4,6 +4,7 @@ import com.msb.framework.redis.RedisClient;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ZSetOperations;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource; import javax.annotation.Resource;
@ -106,6 +107,42 @@ public class CacheController {
return value; return value;
} }
@PostMapping(value = "/cache/zadd/{key}/{score}/{member}")
public Boolean zadd(@PathVariable(value = "key")String key,
@PathVariable(value = "score")Long score,
@PathVariable(value = "member")Object member){
log.info("【缓存模块】 zaddLong方法存储key = {}存储score = {}存储value = {}", key,score, member);
Boolean result = redisClient.zAdd(key, member, score);
return result;
}
@GetMapping(value = "/cache/zrangebyscorecount/{key}/{start}/{end}")
public int zRangeByScoreCount(@PathVariable(value = "key") String key,
@PathVariable(value = "start") Double start,
@PathVariable(value = "end") Double end) {
log.info("【缓存模块】 zRangeByScoreCount方法查询key = {},start = {},end = {}", key,start,end);
Set<ZSetOperations.TypedTuple<Object>> values = redisTemplate.opsForZSet().rangeByScoreWithScores(key, start, end);
if(values != null){
return values.size();
}
return 0;
}
@DeleteMapping(value = "/cache/zremove/{key}/{member}")
public void zRemove(@PathVariable(value = "key") String key,@PathVariable(value = "member") String member) {
log.info("【缓存模块】 zRemove方法删除key = {},member = {}", key,member);
redisClient.zRemove(key,member);
}
@PostMapping(value = "/cache/hincrby/{key}/{field}/{delta}")
public Long hIncrBy(@PathVariable(value = "key") String key,
@PathVariable(value = "field") String field,
@PathVariable(value = "delta") Long delta){
log.info("【缓存模块】 hIncrBy方法自增 key = {},field = {}number = {}", key,field,delta);
Long result = redisClient.incrementMap(key, field, delta);
log.info("【缓存模块】 hIncrBy方法自增 key = {},field = {}number = {},剩余数值为 = {}", key,field,delta,result);
return result;
}
} }

@ -48,5 +48,15 @@ public interface CacheConstant {
*/ */
String TRANSFER = "transfer:"; String TRANSFER = "transfer:";
/**
* 1key
*/
String LIMIT_MINUTES = "limit:minutes:";
/**
* 1key
*/
String LIMIT_HOURS = "limit:hours:";
} }

@ -17,4 +17,19 @@ public interface SmsConstant {
*/ */
int REPORT_FAIL = 2; int REPORT_FAIL = 2;
/**
*
*/
int CODE_TYPE = 0;
/**
*
*/
int NOTIFY_TYPE = 1;
/**
*
*/
int MARKETING_TYPE = 2;
} }

@ -21,6 +21,8 @@ public enum ExceptionEnums {
HAVE_DIRTY_WORD(-13,"当前短信内容中包含敏感词信息!"), HAVE_DIRTY_WORD(-13,"当前短信内容中包含敏感词信息!"),
BLACK_GLOBAL(-14,"当前手机号为平台黑名单!"), BLACK_GLOBAL(-14,"当前手机号为平台黑名单!"),
BLACK_CLIENT(-15,"当前手机号为客户黑名单!"), BLACK_CLIENT(-15,"当前手机号为客户黑名单!"),
ONE_MINUTE_LIMIT(-16,"1分钟限流规则生效无法发送短信"),
ONE_HOUR_LIMIT(-17,"1小时限流规则生效无法发送短信"),
; ;
private Integer code; private Integer code;

@ -6,6 +6,7 @@ import lombok.NoArgsConstructor;
import java.io.Serializable; import java.io.Serializable;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.ZoneOffset;
/** /**
* --POJO * --POJO
@ -58,6 +59,7 @@ public class StandardSubmit implements Serializable {
*/ */
private LocalDateTime sendTime; private LocalDateTime sendTime;
/** /**
* 7067, * 7067,
*/ */
@ -125,6 +127,11 @@ public class StandardSubmit implements Serializable {
*/ */
private Boolean isTransfer = false; private Boolean isTransfer = false;
/**
* 1
*/
private Long oneHourLimitMilli;
// 后续再做封装~~~~ // 后续再做封装~~~~

@ -1,10 +1,7 @@
package com.mashibing.strategy.client; package com.mashibing.strategy.client;
import org.springframework.cloud.openfeign.FeignClient; import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.*;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import java.util.Set; import java.util.Set;
@ -29,4 +26,22 @@ public interface BeaconCacheClient {
@GetMapping("/cache/smember/{key}") @GetMapping("/cache/smember/{key}")
Set<String> smember(@PathVariable(value = "key")String key); Set<String> smember(@PathVariable(value = "key")String key);
@PostMapping(value = "/cache/zadd/{key}/{score}/{member}")
Boolean zadd(@PathVariable(value = "key")String key,
@PathVariable(value = "score")Long score,
@PathVariable(value = "member")Object member);
@GetMapping(value = "/cache/zrangebyscorecount/{key}/{start}/{end}")
int zRangeByScoreCount(@PathVariable(value = "key") String key,
@PathVariable(value = "start") Double start,
@PathVariable(value = "end") Double end);
@DeleteMapping(value = "/cache/zremove/{key}/{member}")
void zRemove(@PathVariable(value = "key") String key,@PathVariable(value = "member") String member);
@PostMapping(value = "/cache/hincrby/{key}/{field}/{delta}")
Long hIncrBy(@PathVariable(value = "key") String key,
@PathVariable(value = "field") String field,
@PathVariable(value = "delta") Long delta);
} }

@ -0,0 +1,56 @@
package com.mashibing.strategy.filter.impl;
import com.mashibing.common.constant.CacheConstant;
import com.mashibing.common.enums.ExceptionEnums;
import com.mashibing.common.exception.StrategyException;
import com.mashibing.common.model.StandardSubmit;
import com.mashibing.strategy.client.BeaconCacheClient;
import com.mashibing.strategy.filter.StrategyFilter;
import com.mashibing.strategy.util.ClientBalanceUtil;
import com.mashibing.strategy.util.ErrorSendMsgUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
*
* @author zjw
* @description
*/
@Service(value = "fee")
@Slf4j
public class FeeStrategyFilter implements StrategyFilter {
@Autowired
private BeaconCacheClient cacheClient;
@Autowired
private ErrorSendMsgUtil sendMsgUtil;
private final String BALANCE = "balance";
@Override
public void strategy(StandardSubmit submit) {
log.info("【策略模块-扣费校验】 校验ing…………");
//1、获取submit中封装的金额
Long fee = submit.getFee();
Long clientId = submit.getClientId();
//2、调用Redis的decr扣减具体的金额
Long amount = cacheClient.hIncrBy(CacheConstant.CLIENT_BALANCE + clientId, BALANCE, -fee);
//3、获取当前客户的欠费金额的限制外部方法调用暂时写死为10000厘
Long amountLimit = ClientBalanceUtil.getClientAmountLimit(submit.getClientId());
//4、判断扣减过后的金额是否超出了金额限制
if(amount < amountLimit) {
log.info("【策略模块-扣费校验】 扣除费用后,超过欠费余额的限制,无法发送短信!!");
//5、如果超过了需要将扣除的费用增加回去并且做后续处理
cacheClient.hIncrBy(CacheConstant.CLIENT_BALANCE + clientId, BALANCE, fee);
submit.setErrorMsg(ExceptionEnums.BALANCE_NOT_ENOUGH.getMsg());
sendMsgUtil.sendWriteLog(submit);
sendMsgUtil.sendPushReport(submit);
throw new StrategyException(ExceptionEnums.BALANCE_NOT_ENOUGH);
}
log.info("【策略模块-扣费校验】 扣费成功!!");
}
}

@ -0,0 +1,93 @@
package com.mashibing.strategy.filter.impl;
import com.mashibing.common.constant.CacheConstant;
import com.mashibing.common.constant.SmsConstant;
import com.mashibing.common.enums.ExceptionEnums;
import com.mashibing.common.exception.StrategyException;
import com.mashibing.common.model.StandardSubmit;
import com.mashibing.strategy.client.BeaconCacheClient;
import com.mashibing.strategy.filter.StrategyFilter;
import com.mashibing.strategy.util.ErrorSendMsgUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
/**
* 13
* @author zjw
* @description
*/
@Service("limitOneHour")
@Slf4j
public class LimitOneHourStrategyFilter implements StrategyFilter {
private final String UTC = "+8";
private final long ONE_HOUR = 60 * 1000 * 60 - 1;
private final int RETRY_COUNT = 2;
private final int LIMIT_HOUR = 3;
@Autowired
private BeaconCacheClient cacheClient;
@Autowired
private ErrorSendMsgUtil sendMsgUtil;
@Override
public void strategy(StandardSubmit submit) {
if(submit.getState() != SmsConstant.CODE_TYPE){
return;
}
log.info("【策略模块-一小时限流策略】 开始校验ing………………");
//1、基于submit获取短信的发送时间
LocalDateTime sendTime = submit.getSendTime();
//2、基于LocalDateTime获取到时间的毫秒值
long sendTimeMilli = sendTime.toInstant(ZoneOffset.of(UTC)).toEpochMilli();
submit.setOneHourLimitMilli(sendTimeMilli);
//3、基于submit获取客户标识以及手机号信息
Long clientId = submit.getClientId();
String mobile = submit.getMobile();
//4、优先将当前短信发送信息插入到Redis的ZSet结构中 zadd
String key = CacheConstant.LIMIT_HOURS + clientId + CacheConstant.SEPARATE + mobile;
//5、如果插入失败需要重新的将毫秒值做改变尝试重新插入
int retry = 0;
while(!cacheClient.zadd(key, submit.getOneHourLimitMilli(), submit.getOneHourLimitMilli())){
// 发送失败,尝试重试
if(retry == RETRY_COUNT) break;
retry++;
// 插入失败是因为存储的member不允许重复既然重复了将时间向后移动移动到当前系统时间
submit.setOneHourLimitMilli(System.currentTimeMillis());
}
// 如果retry为2代表已经重试了2次但是依然没有成功
if(retry == RETRY_COUNT){
log.info("【策略模块-一小时限流策略】 插入失败! 满足一小时限流规则,无法发送!");
submit.setErrorMsg(ExceptionEnums.ONE_HOUR_LIMIT + ",mobile = " + mobile);
sendMsgUtil.sendWriteLog(submit);
sendMsgUtil.sendPushReport(submit);
throw new StrategyException(ExceptionEnums.ONE_HOUR_LIMIT);
}
// 没有重试2次3次之内将数据正常的插入了。基于zrangebyscore做范围查询
long start = submit.getOneHourLimitMilli() - ONE_HOUR;
int count = cacheClient.zRangeByScoreCount(key, Double.parseDouble(start + ""), Double.parseDouble(submit.getOneHourLimitMilli() + ""));
if(count > LIMIT_HOUR){
log.info("【策略模块-一小时限流策略】 插入失败! 满足一小时限流规则,无法发送!");
cacheClient.zRemove(key,submit.getOneHourLimitMilli() + "");
submit.setErrorMsg(ExceptionEnums.ONE_HOUR_LIMIT + ",mobile = " + mobile);
sendMsgUtil.sendWriteLog(submit);
sendMsgUtil.sendPushReport(submit);
throw new StrategyException(ExceptionEnums.ONE_HOUR_LIMIT);
}
log.info("【策略模块-一小时限流策略】 一小时限流规则通过,可以发送!");
}
}

@ -0,0 +1,84 @@
package com.mashibing.strategy.filter.impl;
import com.mashibing.common.constant.CacheConstant;
import com.mashibing.common.constant.SmsConstant;
import com.mashibing.common.enums.ExceptionEnums;
import com.mashibing.common.exception.StrategyException;
import com.mashibing.common.model.StandardSubmit;
import com.mashibing.strategy.client.BeaconCacheClient;
import com.mashibing.strategy.filter.StrategyFilter;
import com.mashibing.strategy.util.ErrorSendMsgUtil;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.weaver.tools.cache.CacheStatistics;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
/**
* 11
* @author zjw
* @description
*/
@Service("limitOneMinute")
@Slf4j
public class LimitOneMinuteStrategyFilter implements StrategyFilter {
private final String UTC = "+8";
private final long ONE_MINUTE = 60 * 1000 - 1;
@Autowired
private BeaconCacheClient cacheClient;
@Autowired
private ErrorSendMsgUtil sendMsgUtil;
@Override
public void strategy(StandardSubmit submit) {
if(submit.getState() != SmsConstant.CODE_TYPE){
return;
}
log.info("【策略模块-一分钟限流策略】 开始校验ing………………");
//1、基于submit获取短信的发送时间
LocalDateTime sendTime = submit.getSendTime();
//2、基于LocalDateTime获取到时间的毫秒值
long sendTimeMilli = sendTime.toInstant(ZoneOffset.of(UTC)).toEpochMilli();
//3、基于submit获取客户标识以及手机号信息
Long clientId = submit.getClientId();
String mobile = submit.getMobile();
//4、优先将当前短信发送信息插入到Redis的ZSet结构中 zadd
String key = CacheConstant.LIMIT_MINUTES + clientId + CacheConstant.SEPARATE + mobile;
Boolean addOk = cacheClient.zadd(key, sendTimeMilli, sendTimeMilli);
//5、如果查询失败直接告辞有并发情况60s不能发送两条直接告辞
if(!addOk){
log.info("【策略模块-一分钟限流策略】 插入失败! 满足一分钟限流规则,无法发送!");
submit.setErrorMsg(ExceptionEnums.ONE_MINUTE_LIMIT + ",mobile = " + mobile);
sendMsgUtil.sendWriteLog(submit);
sendMsgUtil.sendPushReport(submit);
throw new StrategyException(ExceptionEnums.ONE_MINUTE_LIMIT);
}
//6、基于zrangebyscore查询1分钟直接是否只有当前查询的发送短信信息
long start = sendTimeMilli - ONE_MINUTE;
int count = cacheClient.zRangeByScoreCount(key, Double.parseDouble(start + ""), Double.parseDouble(sendTimeMilli + ""));
//7、如果大于等于2条短信信息达到了60s一条的短信限流规则直接告辞。
if(count > 1){
// 一分钟之前,发送过短信,限流规则生效
log.info("【策略模块-一分钟限流策略】 插入失败! 满足一分钟限流规则,无法发送!");
cacheClient.zRemove(key,sendTimeMilli + "");
submit.setErrorMsg(ExceptionEnums.ONE_MINUTE_LIMIT + ",mobile = " + mobile);
sendMsgUtil.sendWriteLog(submit);
sendMsgUtil.sendPushReport(submit);
throw new StrategyException(ExceptionEnums.ONE_MINUTE_LIMIT);
}
log.info("【策略模块-一分钟限流策略】 一分钟限流规则通过,可以发送!");
}
}

@ -0,0 +1,19 @@
package com.mashibing.strategy.util;
/**
* @author zjw
* @description
*/
public class ClientBalanceUtil {
/**
*
* @param clientId
* @return
*/
public static Long getClientAmountLimit(Long clientId){
return -10000L;
}
}

@ -0,0 +1,125 @@
package com.mashibing.test.entity;
/**
* @author zjw
* @description
*/
public class Channel {
private Long id;
private String channelName;
private Integer channelType;
private String channelArea;
private String channelAreaCode;
private Long channelPrice;
private Integer channelProtocal;
private String channelIp;
private Integer channelPort;
private String channelUsername;
private String channelPassword;
private String channelNumber;
private Integer isAvailable;
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getChannelName() {
return channelName;
}
public void setChannelName(String channelName) {
this.channelName = channelName;
}
public Integer getChannelType() {
return channelType;
}
public void setChannelType(Integer channelType) {
this.channelType = channelType;
}
public String getChannelArea() {
return channelArea;
}
public void setChannelArea(String channelArea) {
this.channelArea = channelArea;
}
public String getChannelAreaCode() {
return channelAreaCode;
}
public void setChannelAreaCode(String channelAreaCode) {
this.channelAreaCode = channelAreaCode;
}
public Long getChannelPrice() {
return channelPrice;
}
public void setChannelPrice(Long channelPrice) {
this.channelPrice = channelPrice;
}
public Integer getChannelProtocal() {
return channelProtocal;
}
public void setChannelProtocal(Integer channelProtocal) {
this.channelProtocal = channelProtocal;
}
public String getChannelIp() {
return channelIp;
}
public void setChannelIp(String channelIp) {
this.channelIp = channelIp;
}
public Integer getChannelPort() {
return channelPort;
}
public void setChannelPort(Integer channelPort) {
this.channelPort = channelPort;
}
public String getChannelUsername() {
return channelUsername;
}
public void setChannelUsername(String channelUsername) {
this.channelUsername = channelUsername;
}
public String getChannelPassword() {
return channelPassword;
}
public void setChannelPassword(String channelPassword) {
this.channelPassword = channelPassword;
}
public String getChannelNumber() {
return channelNumber;
}
public void setChannelNumber(String channelNumber) {
this.channelNumber = channelNumber;
}
public Integer getIsAvailable() {
return isAvailable;
}
public void setIsAvailable(Integer isAvailable) {
this.isAvailable = isAvailable;
}
}

@ -0,0 +1,54 @@
package com.mashibing.test.entity;
/**
* @author zjw
* @description
*/
public class ClientChannel {
private Long clientId;
private Long channelId;
private Integer clientChannelWeight;
private String clientChannelNumber;
private Integer isAvailable;
public Long getClientId() {
return clientId;
}
public void setClientId(Long clientId) {
this.clientId = clientId;
}
public Long getChannelId() {
return channelId;
}
public void setChannelId(Long channelId) {
this.channelId = channelId;
}
public Integer getClientChannelWeight() {
return clientChannelWeight;
}
public void setClientChannelWeight(Integer clientChannelWeight) {
this.clientChannelWeight = clientChannelWeight;
}
public String getClientChannelNumber() {
return clientChannelNumber;
}
public void setClientChannelNumber(String clientChannelNumber) {
this.clientChannelNumber = clientChannelNumber;
}
public Integer getIsAvailable() {
return isAvailable;
}
public void setIsAvailable(Integer isAvailable) {
this.isAvailable = isAvailable;
}
}

@ -0,0 +1,19 @@
package com.mashibing.test.mapper;
import com.mashibing.test.entity.Channel;
import com.mashibing.test.entity.ClientBalance;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import java.util.List;
/**
* @author zjw
* @description
*/
public interface ChannelMapper {
@Select("select * from channel where is_delete = 0")
List<Channel> findAll();
}

@ -0,0 +1,18 @@
package com.mashibing.test.mapper;
import com.mashibing.test.entity.Channel;
import com.mashibing.test.entity.ClientChannel;
import org.apache.ibatis.annotations.Select;
import java.util.List;
/**
* @author zjw
* @description
*/
public interface ClientChannelMapper {
@Select("select * from client_channel where is_delete = 0")
List<ClientChannel> findAll();
}

@ -0,0 +1,37 @@
package com.mashibing.test.mapper;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.mashibing.test.client.CacheClient;
import com.mashibing.test.entity.Channel;
import com.mashibing.test.entity.MobileBlack;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.List;
import java.util.Map;
@SpringBootTest
@RunWith(SpringRunner.class)
public class ChannelMapperTest {
@Autowired
private ChannelMapper mapper;
@Autowired
private CacheClient cacheClient;
@Test
public void findAll() throws JsonProcessingException {
List<Channel> list = mapper.findAll();
for (Channel channel : list) {
ObjectMapper objectMapper = new ObjectMapper();
Map map = objectMapper.readValue(objectMapper.writeValueAsString(channel), Map.class);
cacheClient.hmset("channel:" + channel.getId(),map);
}
}
}

@ -0,0 +1,37 @@
package com.mashibing.test.mapper;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.mashibing.test.client.CacheClient;
import com.mashibing.test.entity.Channel;
import com.mashibing.test.entity.ClientChannel;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.List;
import java.util.Map;
@SpringBootTest
@RunWith(SpringRunner.class)
public class ClientChannelMapperTest {
@Autowired
private ClientChannelMapper mapper;
@Autowired
private CacheClient cacheClient;
@Test
public void findAll() throws JsonProcessingException {
List<ClientChannel> list = mapper.findAll();
for (ClientChannel clientChannel : list) {
ObjectMapper objectMapper = new ObjectMapper();
Map map = objectMapper.readValue(objectMapper.writeValueAsString(clientChannel), Map.class);
cacheClient.sadd("client_channel:" + clientChannel.getClientId(),map);
}
}
}
Loading…
Cancel
Save