diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/domain/DeduplicationParam.java b/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/DeduplicationParam.java similarity index 94% rename from austin-handler/src/main/java/com/java3y/austin/handler/domain/DeduplicationParam.java rename to austin-handler/src/main/java/com/java3y/austin/handler/deduplication/DeduplicationParam.java index e6c98c6..fe74595 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/domain/DeduplicationParam.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/DeduplicationParam.java @@ -1,4 +1,4 @@ -package com.java3y.austin.handler.domain; +package com.java3y.austin.handler.deduplication; import com.alibaba.fastjson.annotation.JSONField; import com.java3y.austin.common.domain.TaskInfo; diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/DeduplicationRuleService.java b/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/DeduplicationRuleService.java index 0577827..24cb518 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/DeduplicationRuleService.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/DeduplicationRuleService.java @@ -5,7 +5,6 @@ import com.ctrip.framework.apollo.spring.annotation.ApolloConfig; import com.java3y.austin.common.constant.AustinConstant; import com.java3y.austin.common.domain.TaskInfo; import com.java3y.austin.common.enums.DeduplicationType; -import com.java3y.austin.handler.domain.DeduplicationParam; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/build/AbstractDeduplicationBuilder.java b/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/build/AbstractDeduplicationBuilder.java index 722317f..f69c2a8 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/build/AbstractDeduplicationBuilder.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/build/AbstractDeduplicationBuilder.java @@ -3,7 +3,7 @@ package com.java3y.austin.handler.deduplication.build; import com.alibaba.fastjson.JSONObject; import com.java3y.austin.common.domain.TaskInfo; import com.java3y.austin.handler.deduplication.DeduplicationHolder; -import com.java3y.austin.handler.domain.DeduplicationParam; +import com.java3y.austin.handler.deduplication.DeduplicationParam; import org.springframework.beans.factory.annotation.Autowired; import javax.annotation.PostConstruct; diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/build/Builder.java b/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/build/Builder.java index abe55e8..8631037 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/build/Builder.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/build/Builder.java @@ -1,7 +1,7 @@ package com.java3y.austin.handler.deduplication.build; import com.java3y.austin.common.domain.TaskInfo; -import com.java3y.austin.handler.domain.DeduplicationParam; +import com.java3y.austin.handler.deduplication.DeduplicationParam; /** * @author luohaojie diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/build/ContentDeduplicationBuilder.java b/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/build/ContentDeduplicationBuilder.java index 11094a2..358c11e 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/build/ContentDeduplicationBuilder.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/build/ContentDeduplicationBuilder.java @@ -3,7 +3,7 @@ package com.java3y.austin.handler.deduplication.build; import com.java3y.austin.common.domain.TaskInfo; import com.java3y.austin.common.enums.AnchorState; import com.java3y.austin.common.enums.DeduplicationType; -import com.java3y.austin.handler.domain.DeduplicationParam; +import com.java3y.austin.handler.deduplication.DeduplicationParam; import org.springframework.stereotype.Service; diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/build/FrequencyDeduplicationBuilder.java b/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/build/FrequencyDeduplicationBuilder.java index ae4603a..3d83962 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/build/FrequencyDeduplicationBuilder.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/build/FrequencyDeduplicationBuilder.java @@ -4,7 +4,7 @@ import cn.hutool.core.date.DateUtil; import com.java3y.austin.common.domain.TaskInfo; import com.java3y.austin.common.enums.AnchorState; import com.java3y.austin.common.enums.DeduplicationType; -import com.java3y.austin.handler.domain.DeduplicationParam; +import com.java3y.austin.handler.deduplication.DeduplicationParam; import org.springframework.stereotype.Service; import java.util.Date; diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/service/AbstractDeduplicationService.java b/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/service/AbstractDeduplicationService.java index 9c0bbc4..2dc1b6f 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/service/AbstractDeduplicationService.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/service/AbstractDeduplicationService.java @@ -5,13 +5,14 @@ import com.java3y.austin.common.constant.AustinConstant; import com.java3y.austin.common.domain.AnchorInfo; import com.java3y.austin.common.domain.TaskInfo; import com.java3y.austin.handler.deduplication.DeduplicationHolder; -import com.java3y.austin.handler.domain.DeduplicationParam; +import com.java3y.austin.handler.deduplication.DeduplicationParam; import com.java3y.austin.support.utils.LogUtils; import com.java3y.austin.support.utils.RedisUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import javax.annotation.PostConstruct; +import java.time.LocalDateTime; import java.util.*; /** @@ -116,5 +117,4 @@ public abstract class AbstractDeduplicationService implements DeduplicationServi return result; } - } diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/service/DeduplicationService.java b/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/service/DeduplicationService.java index 6eac046..a34a0b6 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/service/DeduplicationService.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/service/DeduplicationService.java @@ -1,7 +1,7 @@ package com.java3y.austin.handler.deduplication.service; -import com.java3y.austin.handler.domain.DeduplicationParam; +import com.java3y.austin.handler.deduplication.DeduplicationParam; /** * @author huskey diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/enums/RateLimitStrategy.java b/austin-handler/src/main/java/com/java3y/austin/handler/enums/RateLimitStrategy.java new file mode 100644 index 0000000..0ada834 --- /dev/null +++ b/austin-handler/src/main/java/com/java3y/austin/handler/enums/RateLimitStrategy.java @@ -0,0 +1,26 @@ +package com.java3y.austin.handler.enums; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.ToString; + +/** + * 限流枚举 + * + * @author 3y + */ +@Getter +@ToString +@AllArgsConstructor +public enum RateLimitStrategy { + + + REQUEST_RATE_LIMIT(10, "根据真实请求数限流"), + SEND_USER_NUM_RATE_LIMIT(20, "根据发送用户数请求数限流"), + ; + + private Integer code; + private String description; + + +} diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/flowcontrol/FlowControlParam.java b/austin-handler/src/main/java/com/java3y/austin/handler/flowcontrol/FlowControlParam.java new file mode 100644 index 0000000..9dfa9af --- /dev/null +++ b/austin-handler/src/main/java/com/java3y/austin/handler/flowcontrol/FlowControlParam.java @@ -0,0 +1,39 @@ +package com.java3y.austin.handler.flowcontrol; + +import com.google.common.util.concurrent.RateLimiter; +import com.java3y.austin.handler.enums.RateLimitStrategy; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * @author 3y + * @date 2022/4/18 + *

+ * 流量控制所需要的参数 + */ +@Builder +@Data +@AllArgsConstructor +@NoArgsConstructor +public class FlowControlParam { + + /** + * 限流器 + * 子类初始化的时候指定 + */ + protected RateLimiter rateLimiter; + + /** + * 限流器初始限流大小 + * 子类初始化的时候指定 + */ + protected Double rateInitValue; + + /** + * 限流的策略 + * 子类初始化的时候指定 + */ + protected RateLimitStrategy rateLimitStrategy; +} diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/flowcontrol/FlowControlService.java b/austin-handler/src/main/java/com/java3y/austin/handler/flowcontrol/FlowControlService.java new file mode 100644 index 0000000..8986c6a --- /dev/null +++ b/austin-handler/src/main/java/com/java3y/austin/handler/flowcontrol/FlowControlService.java @@ -0,0 +1,20 @@ +package com.java3y.austin.handler.flowcontrol; + +import com.java3y.austin.common.domain.TaskInfo; + +/** + * @author 3y + * 流量控制服务 + */ +public interface FlowControlService { + + + /** + * 根据渠道进行流量控制 + * + * @param taskInfo + * @param flowControlParam + */ + void flowControl(TaskInfo taskInfo, FlowControlParam flowControlParam); + +} diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/flowcontrol/impl/FlowControlServiceImpl.java b/austin-handler/src/main/java/com/java3y/austin/handler/flowcontrol/impl/FlowControlServiceImpl.java new file mode 100644 index 0000000..5713a2c --- /dev/null +++ b/austin-handler/src/main/java/com/java3y/austin/handler/flowcontrol/impl/FlowControlServiceImpl.java @@ -0,0 +1,77 @@ +package com.java3y.austin.handler.flowcontrol.impl; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.ctrip.framework.apollo.Config; +import com.ctrip.framework.apollo.spring.annotation.ApolloConfig; +import com.google.common.util.concurrent.RateLimiter; +import com.java3y.austin.common.constant.AustinConstant; +import com.java3y.austin.common.domain.TaskInfo; +import com.java3y.austin.common.enums.ChannelType; +import com.java3y.austin.handler.enums.RateLimitStrategy; +import com.java3y.austin.handler.flowcontrol.FlowControlParam; +import com.java3y.austin.handler.flowcontrol.FlowControlService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +/** + * @author 3y + * @date 2022/4/18 + */ +@Service +@Slf4j +public class FlowControlServiceImpl implements FlowControlService { + + private static final String FLOW_CONTROL_KEY = "flowControl"; + + private static final String FLOW_CONTROL_PREFIX = "flow_control_"; + + @ApolloConfig("boss.austin") + private Config config; + + + @Override + public void flowControl(TaskInfo taskInfo, FlowControlParam flowControlParam) { + RateLimiter rateLimiter = flowControlParam.getRateLimiter(); + Double rateInitValue = flowControlParam.getRateInitValue(); + + double costTime = 0; + + // 对比 初始限流值 与 配置限流值,以 配置中心的限流值为准 + Double rateLimitConfig = getRateLimitConfig(taskInfo.getSendChannel()); + if (rateLimitConfig != null && !rateInitValue.equals(rateLimitConfig)) { + rateLimiter = RateLimiter.create(rateLimitConfig); + flowControlParam.setRateInitValue(rateLimitConfig); + flowControlParam.setRateLimiter(rateLimiter); + } + if (RateLimitStrategy.REQUEST_RATE_LIMIT.equals(flowControlParam.getRateLimitStrategy())) { + costTime = rateLimiter.acquire(1); + } + if (RateLimitStrategy.SEND_USER_NUM_RATE_LIMIT.equals(flowControlParam.getRateLimitStrategy())) { + costTime = rateLimiter.acquire(taskInfo.getReceiver().size()); + } + + if (costTime > 0) { + log.info("consumer {} flow control time {}", + ChannelType.getEnumByCode(taskInfo.getSendChannel()).getDescription(), costTime); + } + } + + /** + * 得到限流值的配置 + *

+ * apollo配置样例 key:flowControl value:{"flow_control_40":1} + *

+ * 渠道枚举可看:com.java3y.austin.common.enums.ChannelType + * + * @param channelCode + */ + private Double getRateLimitConfig(Integer channelCode) { + String flowControlConfig = config.getProperty(FLOW_CONTROL_KEY, AustinConstant.APOLLO_DEFAULT_VALUE_JSON_OBJECT); + JSONObject jsonObject = JSON.parseObject(flowControlConfig); + if (jsonObject.getDouble(FLOW_CONTROL_PREFIX + channelCode) == null) { + return null; + } + return jsonObject.getDouble(FLOW_CONTROL_PREFIX + channelCode); + } +} diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/handler/BaseHandler.java b/austin-handler/src/main/java/com/java3y/austin/handler/handler/BaseHandler.java index ec16778..8c21086 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/handler/BaseHandler.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/handler/BaseHandler.java @@ -1,8 +1,12 @@ package com.java3y.austin.handler.handler; +import com.google.common.util.concurrent.RateLimiter; import com.java3y.austin.common.domain.AnchorInfo; import com.java3y.austin.common.domain.TaskInfo; import com.java3y.austin.common.enums.AnchorState; +import com.java3y.austin.handler.enums.RateLimitStrategy; +import com.java3y.austin.handler.flowcontrol.FlowControlParam; +import com.java3y.austin.handler.flowcontrol.FlowControlService; import com.java3y.austin.support.utils.LogUtils; import org.springframework.beans.factory.annotation.Autowired; @@ -20,12 +24,19 @@ public abstract class BaseHandler implements Handler { */ protected Integer channelCode; + /** + * 限流相关的参数 + * 子类初始化的时候指定 + */ + protected FlowControlParam flowControlParam; + @Autowired private HandlerHolder handlerHolder; @Autowired private LogUtils logUtils; - + @Autowired + private FlowControlService flowControlService; /** * 初始化渠道与Handler的映射关系 @@ -37,6 +48,7 @@ public abstract class BaseHandler implements Handler { @Override public void doHandler(TaskInfo taskInfo) { + flowControl(taskInfo); if (handler(taskInfo)) { logUtils.print(AnchorInfo.builder().state(AnchorState.SEND_SUCCESS.getCode()).businessId(taskInfo.getBusinessId()).ids(taskInfo.getReceiver()).build()); return; @@ -44,6 +56,19 @@ public abstract class BaseHandler implements Handler { logUtils.print(AnchorInfo.builder().state(AnchorState.SEND_FAIL.getCode()).businessId(taskInfo.getBusinessId()).ids(taskInfo.getReceiver()).build()); } + + /** + * 流量控制 + * + * @param taskInfo + */ + public void flowControl(TaskInfo taskInfo) { + // 只有子类指定了限流参数,才需要限流 + if (flowControlParam != null) { + flowControlService.flowControl(taskInfo, flowControlParam); + } + } + /** * 统一处理的handler接口 * @@ -52,4 +77,6 @@ public abstract class BaseHandler implements Handler { */ public abstract boolean handler(TaskInfo taskInfo); + + } diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/EmailHandler.java b/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/EmailHandler.java index 64e71ee..4099515 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/EmailHandler.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/EmailHandler.java @@ -4,10 +4,13 @@ package com.java3y.austin.handler.handler.impl; import cn.hutool.extra.mail.MailAccount; import cn.hutool.extra.mail.MailUtil; import com.google.common.base.Throwables; +import com.google.common.util.concurrent.RateLimiter; import com.java3y.austin.common.constant.SendAccountConstant; import com.java3y.austin.common.domain.TaskInfo; import com.java3y.austin.common.dto.model.EmailContentModel; import com.java3y.austin.common.enums.ChannelType; +import com.java3y.austin.handler.enums.RateLimitStrategy; +import com.java3y.austin.handler.flowcontrol.FlowControlParam; import com.java3y.austin.handler.handler.BaseHandler; import com.java3y.austin.handler.handler.Handler; import com.java3y.austin.support.utils.AccountUtils; @@ -30,6 +33,13 @@ public class EmailHandler extends BaseHandler implements Handler { public EmailHandler() { channelCode = ChannelType.EMAIL.getCode(); + + // 按照请求限流,默认单机 3 qps (具体数值配置在apollo动态调整) + Double rateInitValue = Double.valueOf(3); + flowControlParam = FlowControlParam.builder().rateInitValue(rateInitValue) + .rateLimitStrategy(RateLimitStrategy.REQUEST_RATE_LIMIT) + .rateLimiter(RateLimiter.create(rateInitValue)).build(); + } @Override