去重功能实现V1

pull/2/head
3y 3 years ago
parent 67429a8ea8
commit 39438f1e9c

@ -24,10 +24,6 @@
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>com.tencentcloudapi</groupId>

@ -0,0 +1,31 @@
package com.java3y.austin.domain;
import lombok.Builder;
import lombok.Data;
/**
* @author 3y
* @date 2021/12/11
*
*/
@Builder
@Data
public class DeduplicationParam {
/**
* TaskIno
*/
private TaskInfo taskInfo;
/**
*
*
*/
private Long deduplicationTime;
/**
*
*/
private Integer countNum;
}

@ -1,8 +1,10 @@
package com.java3y.austin.pending;
import cn.hutool.core.collection.CollUtil;
import com.java3y.austin.domain.TaskInfo;
import com.java3y.austin.handler.HandlerHolder;
import com.java3y.austin.service.deduplication.DeduplicationRuleService;
import lombok.Data;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
@ -24,18 +26,24 @@ public class Task implements Runnable {
@Autowired
private HandlerHolder handlerHolder;
@Autowired
private DeduplicationRuleService deduplicationRuleService;
private TaskInfo taskInfo;
@Override
public void run() {
// 0. TODO 丢弃消息
// 1. TODO 通用去重
// 1.平台通用去重
deduplicationRuleService.duplication(taskInfo);
// 2. 真正发送消息
handlerHolder.route(taskInfo.getSendChannel())
.doHandler(taskInfo);
if (CollUtil.isNotEmpty(taskInfo.getReceiver())) {
handlerHolder.route(taskInfo.getSendChannel())
.doHandler(taskInfo);
}
}
}

@ -0,0 +1,100 @@
package com.java3y.austin.service.deduplication;
import cn.hutool.core.collection.CollUtil;
import com.java3y.austin.constant.AustinConstant;
import com.java3y.austin.domain.DeduplicationParam;
import com.java3y.austin.domain.TaskInfo;
import com.java3y.austin.utils.RedisUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.*;
/**
* @author 3y
* @date 2021/12/9
*
*/
@Slf4j
public abstract class AbstractDeduplicationService {
@Autowired
private RedisUtils redisUtils;
public void deduplication(DeduplicationParam param) {
TaskInfo taskInfo = param.getTaskInfo();
Set<String> filterReceiver = new HashSet<>(taskInfo.getReceiver().size());
// 获取redis记录
Set<String> readyPutRedisReceiver = new HashSet<>(taskInfo.getReceiver().size());
List<String> keys = deduplicationAllKey(taskInfo);
Map<String, String> inRedisValue = redisUtils.mGet(keys);
for (String receiver : taskInfo.getReceiver()) {
String key = deduplicationSingleKey(taskInfo, receiver);
String value = inRedisValue.get(key);
// 符合条件的用户
if (value != null && Integer.valueOf(value) >= param.getCountNum()) {
filterReceiver.add(receiver);
} else {
readyPutRedisReceiver.add(receiver);
}
}
// 不符合条件的用户需要更新Redis(无记录添加,有记录则累加次数)
putInRedis(readyPutRedisReceiver, inRedisValue, param);
// 剔除符合去重条件的用户
taskInfo.getReceiver().removeAll(filterReceiver);
}
/**
* Key
*
* @param taskInfo
* @param receiver
* @return
*/
protected abstract String deduplicationSingleKey(TaskInfo taskInfo, String receiver);
/**
* redis
*
* @param readyPutRedisReceiver
*/
private void putInRedis(Set<String> readyPutRedisReceiver,
Map<String, String> inRedisValue, DeduplicationParam param) {
Map<String, String> keyValues = new HashMap<>(readyPutRedisReceiver.size());
for (String receiver : readyPutRedisReceiver) {
String key = deduplicationSingleKey(param.getTaskInfo(), receiver);
if (inRedisValue.get(key) != null) {
keyValues.put(key, String.valueOf(Integer.valueOf(inRedisValue.get(key)) + 1));
} else {
keyValues.put(key, String.valueOf(AustinConstant.TRUE));
}
}
if (CollUtil.isNotEmpty(keyValues)) {
redisUtils.pipelineSetEX(keyValues, param.getDeduplicationTime());
}
}
/**
* Key
*
* @param taskInfo
* @return
*/
private List<String> deduplicationAllKey(TaskInfo taskInfo) {
List<String> result = new ArrayList<>(taskInfo.getReceiver().size());
for (String receiver : taskInfo.getReceiver()) {
String key = deduplicationSingleKey(taskInfo, receiver);
result.add(key);
}
return result;
}
}

@ -0,0 +1,31 @@
package com.java3y.austin.service.deduplication;
import cn.hutool.crypto.digest.DigestUtil;
import com.alibaba.fastjson.JSON;
import com.java3y.austin.domain.TaskInfo;
import org.springframework.stereotype.Service;
/**
* @author 3y
* @date 2021/12/11
* 5
*/
@Service
public class ContentAbstractDeduplicationService extends AbstractDeduplicationService {
/**
* key
* <p>
* key: md5(templateId + templateId + content)
* <p>
*
*
* @param taskInfo
* @return
*/
@Override
public String deduplicationSingleKey(TaskInfo taskInfo, String receiver) {
return DigestUtil.md5Hex(taskInfo.getMessageTemplateId() + receiver
+ JSON.toJSONString(taskInfo.getContentModel()));
}
}

@ -0,0 +1,42 @@
package com.java3y.austin.service.deduplication;
import cn.hutool.core.date.DateUtil;
import com.java3y.austin.domain.DeduplicationParam;
import com.java3y.austin.domain.TaskInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Date;
/**
* @author 3y
* @date 2021/12/12
*
*/
@Service
public class DeduplicationRuleService {
@Autowired
private ContentAbstractDeduplicationService contentDeduplicationService;
@Autowired
private FrequencyDeduplicationService frequencyDeduplicationService;
public void duplication(TaskInfo taskInfo) {
// 文案去重
DeduplicationParam contentParams = DeduplicationParam.builder()
.deduplicationTime(300L).countNum(1).taskInfo(taskInfo)
.build();
contentDeduplicationService.deduplication(contentParams);
// 运营总规则去重(一天内用户收到最多同一个渠道的消息次数)
Long seconds = (DateUtil.endOfDay(new Date()).getTime() - DateUtil.current()) / 1000;
DeduplicationParam businessParams = DeduplicationParam.builder()
.deduplicationTime(seconds).countNum(5).taskInfo(taskInfo)
.build();
frequencyDeduplicationService.deduplication(businessParams);
}
}

@ -0,0 +1,35 @@
package com.java3y.austin.service.deduplication;
import cn.hutool.core.util.StrUtil;
import com.java3y.austin.domain.TaskInfo;
import org.springframework.stereotype.Service;
/**
* @author 3y
* @date 2021/12/12
*
*/
@Service
public class FrequencyDeduplicationService extends AbstractDeduplicationService {
private static final String PREFIX = "FRE";
/**
* key
* <p>
* key receiver + templateId + sendChannel
* <p>
* N
*
* @param taskInfo
* @param receiver
* @return
*/
@Override
public String deduplicationSingleKey(TaskInfo taskInfo, String receiver) {
return PREFIX + StrUtil.C_UNDERLINE
+ receiver + StrUtil.C_UNDERLINE
+ taskInfo.getMessageTemplateId() + StrUtil.C_UNDERLINE
+ taskInfo.getSendChannel();
}
}

@ -57,6 +57,12 @@
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
</dependencies>
</project>

@ -0,0 +1,63 @@
package com.java3y.austin.utils;
import cn.hutool.core.collection.CollUtil;
import com.google.common.base.Throwables;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author 3y
* @date 2021/12/10
* Redis
*/
@Component
@Slf4j
public class RedisUtils {
@Autowired
private StringRedisTemplate redisTemplate;
/**
* mGetMap
*
* @param keys
*/
public Map<String, String> mGet(List<String> keys) {
HashMap<String, String> result = new HashMap<>(keys.size());
try {
List<String> value = redisTemplate.opsForValue().multiGet(keys);
if (CollUtil.isNotEmpty(value)) {
for (int i = 0; i < keys.size(); i++) {
result.put(keys.get(i), value.get(i));
}
}
} catch (Exception e) {
log.error("redis mGet fail! e:{}", Throwables.getStackTraceAsString(e));
}
return result;
}
/**
* pipeline key-value
*/
public void pipelineSetEX(Map<String, String> keyValues, Long seconds) {
try {
redisTemplate.executePipelined((RedisCallback<String>) connection -> {
for (Map.Entry<String, String> entry : keyValues.entrySet()) {
connection.setEx(entry.getKey().getBytes(), seconds,
entry.getValue().getBytes());
}
return null;
});
} catch (Exception e) {
log.error("redis pipelineSetEX fail! e:{}", Throwables.getStackTraceAsString(e));
}
}
}
Loading…
Cancel
Save