From 39438f1e9cdff516f26f3de29065163e5d0a0c66 Mon Sep 17 00:00:00 2001 From: 3y Date: Sun, 12 Dec 2021 22:03:36 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8E=BB=E9=87=8D=E5=8A=9F=E8=83=BD=E5=AE=9E?= =?UTF-8?q?=E7=8E=B0V1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- austin-handler/pom.xml | 4 - .../austin/domain/DeduplicationParam.java | 31 ++++++ .../java/com/java3y/austin/pending/Task.java | 16 ++- .../AbstractDeduplicationService.java | 100 ++++++++++++++++++ .../ContentAbstractDeduplicationService.java | 31 ++++++ .../DeduplicationRuleService.java | 42 ++++++++ .../FrequencyDeduplicationService.java | 35 ++++++ austin-support/pom.xml | 6 ++ .../com/java3y/austin/utils/RedisUtils.java | 63 +++++++++++ 9 files changed, 320 insertions(+), 8 deletions(-) create mode 100644 austin-handler/src/main/java/com/java3y/austin/domain/DeduplicationParam.java create mode 100644 austin-handler/src/main/java/com/java3y/austin/service/deduplication/AbstractDeduplicationService.java create mode 100644 austin-handler/src/main/java/com/java3y/austin/service/deduplication/ContentAbstractDeduplicationService.java create mode 100644 austin-handler/src/main/java/com/java3y/austin/service/deduplication/DeduplicationRuleService.java create mode 100644 austin-handler/src/main/java/com/java3y/austin/service/deduplication/FrequencyDeduplicationService.java create mode 100644 austin-support/src/main/java/com/java3y/austin/utils/RedisUtils.java diff --git a/austin-handler/pom.xml b/austin-handler/pom.xml index ef643cb..47b1552 100644 --- a/austin-handler/pom.xml +++ b/austin-handler/pom.xml @@ -24,10 +24,6 @@ 0.0.1-SNAPSHOT - - org.springframework.boot - spring-boot-starter-data-redis - com.tencentcloudapi diff --git a/austin-handler/src/main/java/com/java3y/austin/domain/DeduplicationParam.java b/austin-handler/src/main/java/com/java3y/austin/domain/DeduplicationParam.java new file mode 100644 index 0000000..46eefff --- /dev/null +++ b/austin-handler/src/main/java/com/java3y/austin/domain/DeduplicationParam.java @@ -0,0 +1,31 @@ +package com.java3y.austin.domain; + +import lombok.Builder; +import lombok.Data; + +/** + * @author 3y + * @date 2021/12/11 + * 去重服务所需要的参数 + */ +@Builder +@Data +public class DeduplicationParam { + + /** + * TaskIno信息 + */ + private TaskInfo taskInfo; + + /** + * 去重时间 + * 单位:秒 + */ + private Long deduplicationTime; + + /** + * 需达到的次数去重 + */ + private Integer countNum; + +} diff --git a/austin-handler/src/main/java/com/java3y/austin/pending/Task.java b/austin-handler/src/main/java/com/java3y/austin/pending/Task.java index 04812c1..dff270a 100644 --- a/austin-handler/src/main/java/com/java3y/austin/pending/Task.java +++ b/austin-handler/src/main/java/com/java3y/austin/pending/Task.java @@ -1,8 +1,10 @@ package com.java3y.austin.pending; +import cn.hutool.core.collection.CollUtil; import com.java3y.austin.domain.TaskInfo; import com.java3y.austin.handler.HandlerHolder; +import com.java3y.austin.service.deduplication.DeduplicationRuleService; import lombok.Data; import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; @@ -24,18 +26,24 @@ public class Task implements Runnable { @Autowired private HandlerHolder handlerHolder; + @Autowired + private DeduplicationRuleService deduplicationRuleService; + private TaskInfo taskInfo; @Override public void run() { - // 0. TODO 丢弃消息 - // 1. TODO 通用去重 + // 1.平台通用去重 + deduplicationRuleService.duplication(taskInfo); + // 2. 真正发送消息 - handlerHolder.route(taskInfo.getSendChannel()) - .doHandler(taskInfo); + if (CollUtil.isNotEmpty(taskInfo.getReceiver())) { + handlerHolder.route(taskInfo.getSendChannel()) + .doHandler(taskInfo); + } } } diff --git a/austin-handler/src/main/java/com/java3y/austin/service/deduplication/AbstractDeduplicationService.java b/austin-handler/src/main/java/com/java3y/austin/service/deduplication/AbstractDeduplicationService.java new file mode 100644 index 0000000..4fa703b --- /dev/null +++ b/austin-handler/src/main/java/com/java3y/austin/service/deduplication/AbstractDeduplicationService.java @@ -0,0 +1,100 @@ +package com.java3y.austin.service.deduplication; + +import cn.hutool.core.collection.CollUtil; +import com.java3y.austin.constant.AustinConstant; +import com.java3y.austin.domain.DeduplicationParam; +import com.java3y.austin.domain.TaskInfo; +import com.java3y.austin.utils.RedisUtils; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; + +import java.util.*; + +/** + * @author 3y + * @date 2021/12/9 + * 去重服务 + */ +@Slf4j +public abstract class AbstractDeduplicationService { + + @Autowired + private RedisUtils redisUtils; + + public void deduplication(DeduplicationParam param) { + TaskInfo taskInfo = param.getTaskInfo(); + Set filterReceiver = new HashSet<>(taskInfo.getReceiver().size()); + + // 获取redis记录 + Set readyPutRedisReceiver = new HashSet<>(taskInfo.getReceiver().size()); + List keys = deduplicationAllKey(taskInfo); + Map inRedisValue = redisUtils.mGet(keys); + + for (String receiver : taskInfo.getReceiver()) { + String key = deduplicationSingleKey(taskInfo, receiver); + String value = inRedisValue.get(key); + + // 符合条件的用户 + if (value != null && Integer.valueOf(value) >= param.getCountNum()) { + filterReceiver.add(receiver); + } else { + readyPutRedisReceiver.add(receiver); + } + } + + // 不符合条件的用户:需要更新Redis(无记录添加,有记录则累加次数) + putInRedis(readyPutRedisReceiver, inRedisValue, param); + + // 剔除符合去重条件的用户 + taskInfo.getReceiver().removeAll(filterReceiver); + } + + + /** + * 构建去重的Key + * + * @param taskInfo + * @param receiver + * @return + */ + protected abstract String deduplicationSingleKey(TaskInfo taskInfo, String receiver); + + + /** + * 存入redis 实现去重 + * + * @param readyPutRedisReceiver + */ + private void putInRedis(Set readyPutRedisReceiver, + Map inRedisValue, DeduplicationParam param) { + Map keyValues = new HashMap<>(readyPutRedisReceiver.size()); + for (String receiver : readyPutRedisReceiver) { + String key = deduplicationSingleKey(param.getTaskInfo(), receiver); + if (inRedisValue.get(key) != null) { + keyValues.put(key, String.valueOf(Integer.valueOf(inRedisValue.get(key)) + 1)); + } else { + keyValues.put(key, String.valueOf(AustinConstant.TRUE)); + } + } + if (CollUtil.isNotEmpty(keyValues)) { + redisUtils.pipelineSetEX(keyValues, param.getDeduplicationTime()); + } + } + + /** + * 获取得到当前消息模板所有的去重Key + * + * @param taskInfo + * @return + */ + private List deduplicationAllKey(TaskInfo taskInfo) { + List result = new ArrayList<>(taskInfo.getReceiver().size()); + for (String receiver : taskInfo.getReceiver()) { + String key = deduplicationSingleKey(taskInfo, receiver); + result.add(key); + } + return result; + } + + +} diff --git a/austin-handler/src/main/java/com/java3y/austin/service/deduplication/ContentAbstractDeduplicationService.java b/austin-handler/src/main/java/com/java3y/austin/service/deduplication/ContentAbstractDeduplicationService.java new file mode 100644 index 0000000..ec800e0 --- /dev/null +++ b/austin-handler/src/main/java/com/java3y/austin/service/deduplication/ContentAbstractDeduplicationService.java @@ -0,0 +1,31 @@ +package com.java3y.austin.service.deduplication; + +import cn.hutool.crypto.digest.DigestUtil; +import com.alibaba.fastjson.JSON; +import com.java3y.austin.domain.TaskInfo; +import org.springframework.stereotype.Service; + +/** + * @author 3y + * @date 2021/12/11 + * 内容去重服务(默认5分钟相同的文案发给相同的用户去重) + */ +@Service +public class ContentAbstractDeduplicationService extends AbstractDeduplicationService { + + /** + * 内容去重 构建key + *

+ * key: md5(templateId + templateId + content) + *

+ * 相同的内容相同的模板短时间内发给同一个人 + * + * @param taskInfo + * @return + */ + @Override + public String deduplicationSingleKey(TaskInfo taskInfo, String receiver) { + return DigestUtil.md5Hex(taskInfo.getMessageTemplateId() + receiver + + JSON.toJSONString(taskInfo.getContentModel())); + } +} diff --git a/austin-handler/src/main/java/com/java3y/austin/service/deduplication/DeduplicationRuleService.java b/austin-handler/src/main/java/com/java3y/austin/service/deduplication/DeduplicationRuleService.java new file mode 100644 index 0000000..971fd20 --- /dev/null +++ b/austin-handler/src/main/java/com/java3y/austin/service/deduplication/DeduplicationRuleService.java @@ -0,0 +1,42 @@ +package com.java3y.austin.service.deduplication; + +import cn.hutool.core.date.DateUtil; +import com.java3y.austin.domain.DeduplicationParam; +import com.java3y.austin.domain.TaskInfo; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.Date; + +/** + * @author 3y + * @date 2021/12/12 + * 去重服务 + */ +@Service +public class DeduplicationRuleService { + + @Autowired + private ContentAbstractDeduplicationService contentDeduplicationService; + + @Autowired + private FrequencyDeduplicationService frequencyDeduplicationService; + + + public void duplication(TaskInfo taskInfo) { + + // 文案去重 + DeduplicationParam contentParams = DeduplicationParam.builder() + .deduplicationTime(300L).countNum(1).taskInfo(taskInfo) + .build(); + contentDeduplicationService.deduplication(contentParams); + + // 运营总规则去重(一天内用户收到最多同一个渠道的消息次数) + Long seconds = (DateUtil.endOfDay(new Date()).getTime() - DateUtil.current()) / 1000; + DeduplicationParam businessParams = DeduplicationParam.builder() + .deduplicationTime(seconds).countNum(5).taskInfo(taskInfo) + .build(); + frequencyDeduplicationService.deduplication(businessParams); + } + +} diff --git a/austin-handler/src/main/java/com/java3y/austin/service/deduplication/FrequencyDeduplicationService.java b/austin-handler/src/main/java/com/java3y/austin/service/deduplication/FrequencyDeduplicationService.java new file mode 100644 index 0000000..c135f46 --- /dev/null +++ b/austin-handler/src/main/java/com/java3y/austin/service/deduplication/FrequencyDeduplicationService.java @@ -0,0 +1,35 @@ +package com.java3y.austin.service.deduplication; + +import cn.hutool.core.util.StrUtil; +import com.java3y.austin.domain.TaskInfo; +import org.springframework.stereotype.Service; + +/** + * @author 3y + * @date 2021/12/12 + * 频次去重服务 + */ +@Service +public class FrequencyDeduplicationService extends AbstractDeduplicationService { + + private static final String PREFIX = "FRE"; + + /** + * 业务规则去重 构建key + *

+ * key : receiver + templateId + sendChannel + *

+ * 一天内一个用户只能收到某个渠道的消息 N 次 + * + * @param taskInfo + * @param receiver + * @return + */ + @Override + public String deduplicationSingleKey(TaskInfo taskInfo, String receiver) { + return PREFIX + StrUtil.C_UNDERLINE + + receiver + StrUtil.C_UNDERLINE + + taskInfo.getMessageTemplateId() + StrUtil.C_UNDERLINE + + taskInfo.getSendChannel(); + } +} diff --git a/austin-support/pom.xml b/austin-support/pom.xml index 9fc1cff..59ff73b 100644 --- a/austin-support/pom.xml +++ b/austin-support/pom.xml @@ -57,6 +57,12 @@ org.springframework.kafka spring-kafka + + + org.springframework.boot + spring-boot-starter-data-redis + + \ No newline at end of file diff --git a/austin-support/src/main/java/com/java3y/austin/utils/RedisUtils.java b/austin-support/src/main/java/com/java3y/austin/utils/RedisUtils.java new file mode 100644 index 0000000..050b0eb --- /dev/null +++ b/austin-support/src/main/java/com/java3y/austin/utils/RedisUtils.java @@ -0,0 +1,63 @@ +package com.java3y.austin.utils; + +import cn.hutool.core.collection.CollUtil; +import com.google.common.base.Throwables; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisCallback; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.stereotype.Component; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * @author 3y + * @date 2021/12/10 + * 对Redis的某些操作二次封装 + */ +@Component +@Slf4j +public class RedisUtils { + + @Autowired + private StringRedisTemplate redisTemplate; + + /** + * mGet将结果封装为Map + * + * @param keys + */ + public Map mGet(List keys) { + HashMap result = new HashMap<>(keys.size()); + try { + List value = redisTemplate.opsForValue().multiGet(keys); + if (CollUtil.isNotEmpty(value)) { + for (int i = 0; i < keys.size(); i++) { + result.put(keys.get(i), value.get(i)); + } + } + } catch (Exception e) { + log.error("redis mGet fail! e:{}", Throwables.getStackTraceAsString(e)); + } + return result; + } + + /** + * pipeline 设置 key-value 并设置过期时间 + */ + public void pipelineSetEX(Map keyValues, Long seconds) { + try { + redisTemplate.executePipelined((RedisCallback) connection -> { + for (Map.Entry entry : keyValues.entrySet()) { + connection.setEx(entry.getKey().getBytes(), seconds, + entry.getValue().getBytes()); + } + return null; + }); + } catch (Exception e) { + log.error("redis pipelineSetEX fail! e:{}", Throwables.getStackTraceAsString(e)); + } + } +}