Merge branch 'master' of https://gitee.com/caolongxiu/austin into clxpr

pull/6/head
3y 3 years ago
commit 516914eb46

@ -0,0 +1,36 @@
package com.java3y.austin.handler.deduplication.limit;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.handler.deduplication.service.AbstractDeduplicationService;
import java.util.*;
/**
* @author cao
* @date 2022-04-20 12:00
*/
public abstract class AbstractLimitService implements LimitService {
/**
* Key
*
* @param taskInfo
* @return
*/
protected List<String> deduplicationAllKey(AbstractDeduplicationService service, TaskInfo taskInfo) {
List<String> result = new ArrayList<>(taskInfo.getReceiver().size());
for (String receiver : taskInfo.getReceiver()) {
String key = deduplicationSingleKey(service, taskInfo, receiver);
result.add(key);
}
return result;
}
protected String deduplicationSingleKey(AbstractDeduplicationService service, TaskInfo taskInfo, String receiver) {
return service.deduplicationSingleKey(taskInfo, receiver);
}
}

@ -0,0 +1,24 @@
package com.java3y.austin.handler.deduplication.limit;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.handler.deduplication.DeduplicationParam;
import com.java3y.austin.handler.deduplication.service.AbstractDeduplicationService;
import java.util.Set;
/**
* @author cao
* @date 2022-04-20 11:58
*/
public interface LimitService {
/**
* @param service
* @param taskInfo
* @param param
* @return
*/
Set<String> limitFilter(AbstractDeduplicationService service, TaskInfo taskInfo, DeduplicationParam param);
}

@ -0,0 +1,76 @@
package com.java3y.austin.handler.deduplication.limit;
import cn.hutool.core.collection.CollUtil;
import com.java3y.austin.common.constant.AustinConstant;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.handler.deduplication.DeduplicationParam;
import com.java3y.austin.handler.deduplication.service.AbstractDeduplicationService;
import com.java3y.austin.support.utils.RedisUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.*;
import java.util.stream.Collectors;
/**
* @author cao
* @date 2022-04-20 13:41
*/
@Service(value = "SimpleLimitService")
public class SimpleLimitService extends AbstractLimitService {
private static final String LIMIT_TAG = "SP_";
@Autowired
private RedisUtils redisUtils;
public Set<String> limitFilter(AbstractDeduplicationService service, TaskInfo taskInfo, DeduplicationParam param) {
Set<String> filterReceiver = new HashSet<>(taskInfo.getReceiver().size());
// 获取redis记录
Map<String, String> readyPutRedisReceiver = new HashMap(taskInfo.getReceiver().size());
//redis数据隔离
List<String> keys = deduplicationAllKey(service, taskInfo).stream().map(key -> LIMIT_TAG + key).collect(Collectors.toList());
Map<String, String> inRedisValue = redisUtils.mGet(keys);
for (String receiver : taskInfo.getReceiver()) {
String key = LIMIT_TAG + deduplicationSingleKey(service, taskInfo, receiver);
String value = inRedisValue.get(key);
// 符合条件的用户
if (value != null && Integer.parseInt(value) >= param.getCountNum()) {
filterReceiver.add(receiver);
} else {
readyPutRedisReceiver.put(receiver, key);
}
}
// 不符合条件的用户需要更新Redis(无记录添加,有记录则累加次数)
putInRedis(readyPutRedisReceiver, inRedisValue, param.getDeduplicationTime());
return filterReceiver;
}
/**
* redis
*
* @param readyPutRedisReceiver
*/
private void putInRedis(Map<String, String> readyPutRedisReceiver,
Map<String, String> inRedisValue, Long deduplicationTime) {
Map<String, String> keyValues = new HashMap<>(readyPutRedisReceiver.size());
for (Map.Entry<String, String> entry : readyPutRedisReceiver.entrySet()) {
String key = entry.getValue();
if (inRedisValue.get(key) != null) {
keyValues.put(key, String.valueOf(Integer.valueOf(inRedisValue.get(key)) + 1));
} else {
keyValues.put(key, String.valueOf(AustinConstant.TRUE));
}
}
if (CollUtil.isNotEmpty(keyValues)) {
redisUtils.pipelineSetEx(keyValues, deduplicationTime);
}
}
}

@ -0,0 +1,68 @@
package com.java3y.austin.handler.deduplication.limit;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.handler.deduplication.DeduplicationParam;
import com.java3y.austin.handler.deduplication.service.AbstractDeduplicationService;
import com.java3y.austin.support.utils.RedisUtils;
import com.java3y.austin.support.utils.SnowFlakeIdUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scripting.support.ResourceScriptSource;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
/**
* @author cao
* @date 2022-04-20 11:34
*/
@Service(value = "SlideWindowLimitService")
public class SlideWindowLimitService extends AbstractLimitService {
private static final String LIMIT_TAG = "SW_";
@Autowired
private RedisUtils redisUtils;
private SnowFlakeIdUtils snowFlakeIdUtils = new SnowFlakeIdUtils(1, 1);
private DefaultRedisScript<Long> redisScript;
@PostConstruct
public void init() {
redisScript = new DefaultRedisScript();
redisScript.setResultType(Long.class);
redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("limit.lua")));
}
/**
* @param service
* @param taskInfo
* @param param
* @return
*/
@Override
public Set<String> limitFilter(AbstractDeduplicationService service, TaskInfo taskInfo, DeduplicationParam param) {
Set<String> filterReceiver = new HashSet<>(taskInfo.getReceiver().size());
long nowTime = System.currentTimeMillis();
for (String receiver : taskInfo.getReceiver()) {
String key = LIMIT_TAG + deduplicationSingleKey(service, taskInfo, receiver);
String scoreValue = String.valueOf(snowFlakeIdUtils.nextId());
String score = String.valueOf(nowTime);
if (redisUtils.execLimitLua(redisScript, Arrays.asList(key), String.valueOf(param.getDeduplicationTime() * 1000), score, String.valueOf(param.getCountNum()), scoreValue)) {
filterReceiver.add(receiver);
}
}
return filterReceiver;
}
}

@ -1,18 +1,16 @@
package com.java3y.austin.handler.deduplication.service;
import cn.hutool.core.collection.CollUtil;
import com.java3y.austin.common.constant.AustinConstant;
import com.java3y.austin.common.domain.AnchorInfo;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.handler.deduplication.DeduplicationHolder;
import com.java3y.austin.handler.deduplication.DeduplicationParam;
import com.java3y.austin.handler.deduplication.limit.LimitService;
import com.java3y.austin.support.utils.LogUtils;
import com.java3y.austin.support.utils.RedisUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.PostConstruct;
import java.time.LocalDateTime;
import java.util.*;
/**
@ -22,8 +20,11 @@ import java.util.*;
*/
@Slf4j
public abstract class AbstractDeduplicationService implements DeduplicationService {
protected Integer deduplicationType;
protected LimitService limitService;
@Autowired
private DeduplicationHolder deduplicationHolder;
@ -32,8 +33,6 @@ public abstract class AbstractDeduplicationService implements DeduplicationServi
deduplicationHolder.putService(deduplicationType, this);
}
@Autowired
private RedisUtils redisUtils;
@Autowired
private LogUtils logUtils;
@ -41,27 +40,8 @@ public abstract class AbstractDeduplicationService implements DeduplicationServi
@Override
public void deduplication(DeduplicationParam param) {
TaskInfo taskInfo = param.getTaskInfo();
Set<String> filterReceiver = new HashSet<>(taskInfo.getReceiver().size());
// 获取redis记录
Set<String> readyPutRedisReceiver = new HashSet<>(taskInfo.getReceiver().size());
List<String> keys = deduplicationAllKey(taskInfo);
Map<String, String> inRedisValue = redisUtils.mGet(keys);
for (String receiver : taskInfo.getReceiver()) {
String key = deduplicationSingleKey(taskInfo, receiver);
String value = inRedisValue.get(key);
// 符合条件的用户
if (value != null && Integer.parseInt(value) >= param.getCountNum()) {
filterReceiver.add(receiver);
} else {
readyPutRedisReceiver.add(receiver);
}
}
// 不符合条件的用户需要更新Redis(无记录添加,有记录则累加次数)
putInRedis(readyPutRedisReceiver, inRedisValue, param);
Set<String> filterReceiver = limitService.limitFilter(this, taskInfo, param);
// 剔除符合去重条件的用户
if (CollUtil.isNotEmpty(filterReceiver)) {
@ -78,43 +58,7 @@ public abstract class AbstractDeduplicationService implements DeduplicationServi
* @param receiver
* @return
*/
protected abstract String deduplicationSingleKey(TaskInfo taskInfo, String receiver);
public abstract String deduplicationSingleKey(TaskInfo taskInfo, String receiver);
/**
* redis
*
* @param readyPutRedisReceiver
*/
private void putInRedis(Set<String> readyPutRedisReceiver,
Map<String, String> inRedisValue, DeduplicationParam param) {
Map<String, String> keyValues = new HashMap<>(readyPutRedisReceiver.size());
for (String receiver : readyPutRedisReceiver) {
String key = deduplicationSingleKey(param.getTaskInfo(), receiver);
if (inRedisValue.get(key) != null) {
keyValues.put(key, String.valueOf(Integer.valueOf(inRedisValue.get(key)) + 1));
} else {
keyValues.put(key, String.valueOf(AustinConstant.TRUE));
}
}
if (CollUtil.isNotEmpty(keyValues)) {
redisUtils.pipelineSetEx(keyValues, param.getDeduplicationTime());
}
}
/**
* Key
*
* @param taskInfo
* @return
*/
private List<String> deduplicationAllKey(TaskInfo taskInfo) {
List<String> result = new ArrayList<>(taskInfo.getReceiver().size());
for (String receiver : taskInfo.getReceiver()) {
String key = deduplicationSingleKey(taskInfo, receiver);
result.add(key);
}
return result;
}
}

@ -4,6 +4,9 @@ import cn.hutool.crypto.digest.DigestUtil;
import com.alibaba.fastjson.JSON;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.common.enums.DeduplicationType;
import com.java3y.austin.handler.deduplication.limit.LimitService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
/**
@ -14,7 +17,10 @@ import org.springframework.stereotype.Service;
@Service
public class ContentDeduplicationService extends AbstractDeduplicationService {
public ContentDeduplicationService() {
@Autowired
public ContentDeduplicationService(@Qualifier("SlideWindowLimitService") LimitService limitService) {
this.limitService = limitService;
deduplicationType = DeduplicationType.CONTENT.getCode();
}

@ -3,8 +3,12 @@ package com.java3y.austin.handler.deduplication.service;
import cn.hutool.core.util.StrUtil;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.common.enums.DeduplicationType;
import com.java3y.austin.handler.deduplication.limit.LimitService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
/**
* @author 3y
* @date 2021/12/12
@ -13,8 +17,13 @@ import org.springframework.stereotype.Service;
@Service
public class FrequencyDeduplicationService extends AbstractDeduplicationService {
public FrequencyDeduplicationService() {
@Autowired
public FrequencyDeduplicationService(@Qualifier("SimpleLimitService") LimitService limitService) {
this.limitService = limitService;
deduplicationType = DeduplicationType.FREQUENCY.getCode();
}
private static final String PREFIX = "FRE";
@ -33,7 +42,7 @@ public class FrequencyDeduplicationService extends AbstractDeduplicationService
@Override
public String deduplicationSingleKey(TaskInfo taskInfo, String receiver) {
return PREFIX + StrUtil.C_UNDERLINE
+ receiver + StrUtil.C_UNDERLINE
+ receiver + StrUtil.C_UNDERLINE
+ taskInfo.getMessageTemplateId() + StrUtil.C_UNDERLINE
+ taskInfo.getSendChannel();
}

@ -2,10 +2,12 @@ package com.java3y.austin.support.utils;
import cn.hutool.core.collection.CollUtil;
import com.google.common.base.Throwables;
import com.java3y.austin.common.constant.AustinConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.stereotype.Component;
import java.util.HashMap;
@ -93,7 +95,6 @@ public class RedisUtils {
/**
* lpush
*
*/
public void lPush(String key, String value, Long seconds) {
try {
@ -109,7 +110,6 @@ public class RedisUtils {
/**
* lLen
*
*/
public Long lLen(String key) {
try {
@ -119,9 +119,9 @@ public class RedisUtils {
}
return 0L;
}
/**
* lPop
*
*/
public String lPop(String key) {
try {
@ -151,4 +151,33 @@ public class RedisUtils {
log.error("redis pipelineSetEX fail! e:{}", Throwables.getStackTraceAsString(e));
}
}
/**
* lua
* --KEYS[1]: key
* --ARGV[1]:
* --ARGV[2]: score
* --ARGV[3]:
* --ARGV[4]: score value
*
* @param redisScript
* @param keys
* @param args
* @return
*/
public Boolean execLimitLua(RedisScript<Long> redisScript, List<String> keys, String... args) {
try {
Long execute = redisTemplate.execute(redisScript, keys, args);
return AustinConstant.TRUE.equals(execute.intValue());
} catch (Exception e) {
log.error("redis execLimitLua fail! e:{}", Throwables.getStackTraceAsString(e));
}
return false;
}
}

@ -0,0 +1,108 @@
package com.java3y.austin.support.utils;
/**
* id
*
* @author cao
* @date 2022-04-20 13:12
*/
public class SnowFlakeIdUtils {
/**
* (2017-01-01)
*/
private final static long START_TIMESTAMP = 1483200000000L;
/**
*
*/
private final static long SEQUENCE_BIT = 12; //***占用的位数
private final static long MACHINE_BIT = 5; //机器标识占用的位数
private final static long DATA_CENTER_BIT = 5; //数据中心占用的位数
/**
*
*/
private final static long MAX_SEQUENCE = -1L ^ (-1L << SEQUENCE_BIT);
private final static long MAX_MACHINE_NUM = -1L ^ (-1L << MACHINE_BIT);
private final static long MAX_DATA_CENTER_NUM = -1L ^ (-1L << DATA_CENTER_BIT);
/**
*
*/
private final static long MACHINE_LEFT = SEQUENCE_BIT;
private final static long DATA_CENTER_LEFT = SEQUENCE_BIT + MACHINE_BIT;
private final static long TIMESTAMP_LEFT = DATA_CENTER_LEFT + DATA_CENTER_BIT;
private long dataCenterId; //数据中心
private long machineId; //机器标识
private long sequence = 0L; //***
private long lastTimeStamp = -1L; //上一次时间戳
/**
* IDID***
*
* @param dataCenterId ID
* @param machineId ID
*/
public SnowFlakeIdUtils(long dataCenterId, long machineId) {
if (dataCenterId > MAX_DATA_CENTER_NUM || dataCenterId < 0) {
throw new IllegalArgumentException(String.format("DtaCenterId 不能大于 %d 或小于 0", MAX_DATA_CENTER_NUM));
}
if (machineId > MAX_MACHINE_NUM || machineId < 0) {
throw new IllegalArgumentException(String.format("MachineId 不能大于 %d 或小于 0", MAX_MACHINE_NUM));
}
this.dataCenterId = dataCenterId;
this.machineId = machineId;
}
/**
* ID
*
* @return
*/
public synchronized long nextId() {
long currTimeStamp = System.currentTimeMillis();
if (currTimeStamp < lastTimeStamp) {
throw new RuntimeException("当前时间小于上一次记录的时间戳!");
}
if (currTimeStamp == lastTimeStamp) {
//相同毫秒内,***自增
sequence = (sequence + 1) & MAX_SEQUENCE;
//同一毫秒的序列数已经达到最大
if (sequence == 0L) {
currTimeStamp = getNextMill();
}
} else {
//不同毫秒内,***置为0
sequence = 0L;
}
lastTimeStamp = currTimeStamp;
return (currTimeStamp - START_TIMESTAMP) << TIMESTAMP_LEFT //时间戳部分
| dataCenterId << DATA_CENTER_LEFT //数据中心部分
| machineId << MACHINE_LEFT //机器标识部分
| sequence; //***部分
}
/**
*
*
* @return
*/
private long getNextMill() {
long mill = System.currentTimeMillis();
while (mill <= lastTimeStamp) {
mill = System.currentTimeMillis();
}
return mill;
}
}

@ -0,0 +1,17 @@
--KEYS[1]: 限流 key
--ARGV[1]: 限流窗口,毫秒
--ARGV[2]: 当前时间戳作为score
--ARGV[3]: 阈值
--ARGV[4]: score 对应的唯一value
-- 1\. 移除开始时间窗口之前的数据
redis.call('zremrangeByScore', KEYS[1], 0, ARGV[2]-ARGV[1])
-- 2\. 统计当前元素数量
local res = redis.call('zcard', KEYS[1])
-- 3\. 是否超过阈值
if (res == nil) or (res < tonumber(ARGV[3])) then
redis.call('zadd', KEYS[1], ARGV[2], ARGV[4])
redis.call('expire', KEYS[1], ARGV[1]/1000)
return 0
else
return 1
end
Loading…
Cancel
Save