限流器 优化

v1.4.1
Parker 4 years ago
parent 2ec4ef2d67
commit 219d1337ad

@ -2,13 +2,18 @@ package org.opsli.common.utils;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.date.TimeInterval;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.RateLimiter;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import javax.servlet.http.HttpServletRequest;
import java.time.Duration;
import java.util.concurrent.ConcurrentMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/**
* @BelongsProject: think-bboss-parent
@ -22,35 +27,28 @@ public final class RateLimiterUtil {
/** 默认QPS */
public static final double DEFAULT_QPS = 10d;
/** 默认缓存个数 超出后流量自动清理 */
private static final int DEFAULT_CACHE_COUNT = 10_0000;
/** 默认缓存时效 超出后自动清理 */
private static final int DEFAULT_CACHE_TIME = 5;
/** 默认等待时长 */
private static final int DEFAULT_WAIT = 5000;
/** key-value (service,Qps) IP的限制速率 */
private static final ConcurrentMap<String,Double> IP_MAP;
/** userkey-service,limiter ,限制用户对接口的访问速率 */
private static final ConcurrentMap<String, RateLimiter> IP_LIMITER_MAP;
/** 限流器单机缓存 */
private static final Cache<String, Map<String, RateLimiterInner> > LFU_CACHE;
static{
IP_MAP = Maps.newConcurrentMap();
IP_LIMITER_MAP = Maps.newConcurrentMap();
LFU_CACHE = CacheBuilder
.newBuilder().maximumSize(DEFAULT_CACHE_COUNT)
.expireAfterWrite(DEFAULT_CACHE_TIME, TimeUnit.MINUTES).build();
}
/**
* IP QPS
* @param ip
* @param qps
*/
public static void updateIpQps(String ip, double qps) {
IP_MAP.put(ip,qps);
}
/**
* IP
* @param ip
*/
public static void removeIp(String ip) {
IP_MAP.remove(ip);
IP_LIMITER_MAP.remove(ip);
LFU_CACHE.invalidate(ip);
}
/**
@ -61,7 +59,9 @@ public final class RateLimiterUtil {
public static boolean enter(HttpServletRequest request) {
// 获得IP
String clientIpAddress = IPUtil.getClientIpAddress(request);
return RateLimiterUtil.enter(clientIpAddress);
// 获得URI
String clientURI = request.getRequestURI();
return RateLimiterUtil.enter(clientIpAddress, clientURI);
}
/**
@ -72,7 +72,9 @@ public final class RateLimiterUtil {
public static boolean enter(HttpServletRequest request, Double dfQps) {
// 获得IP
String clientIpAddress = IPUtil.getClientIpAddress(request);
return RateLimiterUtil.enter(clientIpAddress, dfQps);
// 获得URI
String clientURI = request.getRequestURI();
return RateLimiterUtil.enter(clientIpAddress, clientURI, dfQps);
}
/**
@ -80,8 +82,8 @@ public final class RateLimiterUtil {
* @param clientIpAddress IP
* @return
*/
public static boolean enter(String clientIpAddress) {
return RateLimiterUtil.enter(clientIpAddress, null);
public static boolean enter(String clientIpAddress, String resource) {
return RateLimiterUtil.enter(clientIpAddress, resource, null);
}
/**
@ -90,58 +92,99 @@ public final class RateLimiterUtil {
* @param dfQps QPS
* @return
*/
public static boolean enter(String clientIpAddress, Double dfQps) {
public static boolean enter(String clientIpAddress, String resource, Double dfQps) {
// 计时器
TimeInterval timer = DateUtil.timer();
Double qps = IP_MAP.get(clientIpAddress);
// 如果当前MAP IP为空 且指派IP不为空 则自动赋值
if(qps == null && dfQps != null){
RateLimiterUtil.updateIpQps(clientIpAddress, dfQps);
qps = dfQps;
Map<String, RateLimiterInner> rateLimiterInnerMap;
try {
rateLimiterInnerMap = LFU_CACHE.get(clientIpAddress, ()->{
// 当缓存取不到时 重新加载缓存
Map<String, RateLimiterInner> tmpMap = Maps.newConcurrentMap();
// 设置限流器
RateLimiterInner rateLimiterInner = new RateLimiterInner();
rateLimiterInner.setQps(dfQps);
rateLimiterInner.setRateLimiter(RateLimiter.create(dfQps));
tmpMap.put(resource, rateLimiterInner);
return tmpMap;
});
}catch (ExecutionException e){
log.error(e.getMessage(), e);
return false;
}
//不限流
if (qps == null || qps == 0.0) {
return true;
}
RateLimiterInner rateLimiterObj;
RateLimiter rateLimiter = IP_LIMITER_MAP.get(clientIpAddress);
Double qps = dfQps;
// 初始化过程
RateLimiterInner rateLimiterInner = rateLimiterInnerMap.get(resource);
// 如果为空 则创建一个新的限流器
if (rateLimiter == null) {
rateLimiter = RateLimiter.create(qps);
if(rateLimiterInner == null){
System.out.println(456);
rateLimiterInner = new RateLimiterInner();
rateLimiterInner.setQps(dfQps);
rateLimiterInner.setRateLimiter(RateLimiter.create(dfQps));
rateLimiterInnerMap.put(resource, rateLimiterInner);
RateLimiter putByOtherThread = IP_LIMITER_MAP.putIfAbsent(clientIpAddress, rateLimiter);
if (putByOtherThread != null) {
rateLimiter = putByOtherThread;
}else{
qps = rateLimiterInner.getQps();
}
rateLimiter.setRate(qps);
rateLimiterObj = rateLimiterInner;
//不限流
if (qps == null || qps == 0.0) {
return true;
}
RateLimiter rateLimiter = rateLimiterObj.getRateLimiter();
//非阻塞
if (!rateLimiter.tryAcquire(Duration.ofMillis(DEFAULT_WAIT))) {
// 花费毫秒数
long timerCount = timer.interval();
//限速中,提示用户
log.info("限流器 耗时: "+ timerCount + "ms, 访问过频繁: " + clientIpAddress);
log.error("限流器 - 访问频繁 耗时: "+ timerCount + "ms, IP地址: " + clientIpAddress + ", URI: " + resource);
return false;
} else {
// 正常访问
// 花费毫秒数
long timerCount = timer.interval();
//正常访问
log.info("限流器 耗时: "+ timerCount + "ms");
return true;
}
}
// ==============
public static void main(String[] args) {
for (int i = 0; i < 100; i++) {
RateLimiterUtil.removeIp("127.0.0.1");
for (int i = 0; i < 1000; i++) {
int j = i;
new Thread(()->{
boolean enter = RateLimiterUtil.enter("127.0.0.1", RateLimiterUtil.DEFAULT_QPS);
boolean enter = RateLimiterUtil.enter("127.0.0.1","/api/v1", RateLimiterUtil.DEFAULT_QPS);
System.out.println(enter);
}).start();
}
}
}
/**
*
*/
@Data
class RateLimiterInner {
/** qps */
private Double qps;
/** 限流器 */
private RateLimiter rateLimiter;
}

Loading…
Cancel
Save