单机限流实现

pull/6/head
3y 2 years ago
parent 9420e13919
commit e8deae88f2

@ -1,4 +1,4 @@
package com.java3y.austin.handler.domain;
package com.java3y.austin.handler.deduplication;
import com.alibaba.fastjson.annotation.JSONField;
import com.java3y.austin.common.domain.TaskInfo;

@ -5,7 +5,6 @@ import com.ctrip.framework.apollo.spring.annotation.ApolloConfig;
import com.java3y.austin.common.constant.AustinConstant;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.common.enums.DeduplicationType;
import com.java3y.austin.handler.domain.DeduplicationParam;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@ -3,7 +3,7 @@ package com.java3y.austin.handler.deduplication.build;
import com.alibaba.fastjson.JSONObject;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.handler.deduplication.DeduplicationHolder;
import com.java3y.austin.handler.domain.DeduplicationParam;
import com.java3y.austin.handler.deduplication.DeduplicationParam;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.PostConstruct;

@ -1,7 +1,7 @@
package com.java3y.austin.handler.deduplication.build;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.handler.domain.DeduplicationParam;
import com.java3y.austin.handler.deduplication.DeduplicationParam;
/**
* @author luohaojie

@ -3,7 +3,7 @@ package com.java3y.austin.handler.deduplication.build;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.common.enums.AnchorState;
import com.java3y.austin.common.enums.DeduplicationType;
import com.java3y.austin.handler.domain.DeduplicationParam;
import com.java3y.austin.handler.deduplication.DeduplicationParam;
import org.springframework.stereotype.Service;

@ -4,7 +4,7 @@ import cn.hutool.core.date.DateUtil;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.common.enums.AnchorState;
import com.java3y.austin.common.enums.DeduplicationType;
import com.java3y.austin.handler.domain.DeduplicationParam;
import com.java3y.austin.handler.deduplication.DeduplicationParam;
import org.springframework.stereotype.Service;
import java.util.Date;

@ -5,13 +5,14 @@ import com.java3y.austin.common.constant.AustinConstant;
import com.java3y.austin.common.domain.AnchorInfo;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.handler.deduplication.DeduplicationHolder;
import com.java3y.austin.handler.domain.DeduplicationParam;
import com.java3y.austin.handler.deduplication.DeduplicationParam;
import com.java3y.austin.support.utils.LogUtils;
import com.java3y.austin.support.utils.RedisUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.PostConstruct;
import java.time.LocalDateTime;
import java.util.*;
/**
@ -116,5 +117,4 @@ public abstract class AbstractDeduplicationService implements DeduplicationServi
return result;
}
}

@ -1,7 +1,7 @@
package com.java3y.austin.handler.deduplication.service;
import com.java3y.austin.handler.domain.DeduplicationParam;
import com.java3y.austin.handler.deduplication.DeduplicationParam;
/**
* @author huskey

@ -0,0 +1,26 @@
package com.java3y.austin.handler.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.ToString;
/**
*
*
* @author 3y
*/
@Getter
@ToString
@AllArgsConstructor
public enum RateLimitStrategy {
REQUEST_RATE_LIMIT(10, "根据真实请求数限流"),
SEND_USER_NUM_RATE_LIMIT(20, "根据发送用户数请求数限流"),
;
private Integer code;
private String description;
}

@ -0,0 +1,39 @@
package com.java3y.austin.handler.flowcontrol;
import com.google.common.util.concurrent.RateLimiter;
import com.java3y.austin.handler.enums.RateLimitStrategy;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author 3y
* @date 2022/4/18
* <p>
*
*/
@Builder
@Data
@AllArgsConstructor
@NoArgsConstructor
public class FlowControlParam {
/**
*
*
*/
protected RateLimiter rateLimiter;
/**
*
*
*/
protected Double rateInitValue;
/**
*
*
*/
protected RateLimitStrategy rateLimitStrategy;
}

@ -0,0 +1,20 @@
package com.java3y.austin.handler.flowcontrol;
import com.java3y.austin.common.domain.TaskInfo;
/**
* @author 3y
*
*/
public interface FlowControlService {
/**
*
*
* @param taskInfo
* @param flowControlParam
*/
void flowControl(TaskInfo taskInfo, FlowControlParam flowControlParam);
}

@ -0,0 +1,77 @@
package com.java3y.austin.handler.flowcontrol.impl;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ctrip.framework.apollo.Config;
import com.ctrip.framework.apollo.spring.annotation.ApolloConfig;
import com.google.common.util.concurrent.RateLimiter;
import com.java3y.austin.common.constant.AustinConstant;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.common.enums.ChannelType;
import com.java3y.austin.handler.enums.RateLimitStrategy;
import com.java3y.austin.handler.flowcontrol.FlowControlParam;
import com.java3y.austin.handler.flowcontrol.FlowControlService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
/**
* @author 3y
* @date 2022/4/18
*/
@Service
@Slf4j
public class FlowControlServiceImpl implements FlowControlService {
private static final String FLOW_CONTROL_KEY = "flowControl";
private static final String FLOW_CONTROL_PREFIX = "flow_control_";
@ApolloConfig("boss.austin")
private Config config;
@Override
public void flowControl(TaskInfo taskInfo, FlowControlParam flowControlParam) {
RateLimiter rateLimiter = flowControlParam.getRateLimiter();
Double rateInitValue = flowControlParam.getRateInitValue();
double costTime = 0;
// 对比 初始限流值 与 配置限流值,以 配置中心的限流值为准
Double rateLimitConfig = getRateLimitConfig(taskInfo.getSendChannel());
if (rateLimitConfig != null && !rateInitValue.equals(rateLimitConfig)) {
rateLimiter = RateLimiter.create(rateLimitConfig);
flowControlParam.setRateInitValue(rateLimitConfig);
flowControlParam.setRateLimiter(rateLimiter);
}
if (RateLimitStrategy.REQUEST_RATE_LIMIT.equals(flowControlParam.getRateLimitStrategy())) {
costTime = rateLimiter.acquire(1);
}
if (RateLimitStrategy.SEND_USER_NUM_RATE_LIMIT.equals(flowControlParam.getRateLimitStrategy())) {
costTime = rateLimiter.acquire(taskInfo.getReceiver().size());
}
if (costTime > 0) {
log.info("consumer {} flow control time {}",
ChannelType.getEnumByCode(taskInfo.getSendChannel()).getDescription(), costTime);
}
}
/**
*
* <p>
* apollo keyflowControl value{"flow_control_40":1}
* <p>
* com.java3y.austin.common.enums.ChannelType
*
* @param channelCode
*/
private Double getRateLimitConfig(Integer channelCode) {
String flowControlConfig = config.getProperty(FLOW_CONTROL_KEY, AustinConstant.APOLLO_DEFAULT_VALUE_JSON_OBJECT);
JSONObject jsonObject = JSON.parseObject(flowControlConfig);
if (jsonObject.getDouble(FLOW_CONTROL_PREFIX + channelCode) == null) {
return null;
}
return jsonObject.getDouble(FLOW_CONTROL_PREFIX + channelCode);
}
}

@ -1,8 +1,12 @@
package com.java3y.austin.handler.handler;
import com.google.common.util.concurrent.RateLimiter;
import com.java3y.austin.common.domain.AnchorInfo;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.common.enums.AnchorState;
import com.java3y.austin.handler.enums.RateLimitStrategy;
import com.java3y.austin.handler.flowcontrol.FlowControlParam;
import com.java3y.austin.handler.flowcontrol.FlowControlService;
import com.java3y.austin.support.utils.LogUtils;
import org.springframework.beans.factory.annotation.Autowired;
@ -20,12 +24,19 @@ public abstract class BaseHandler implements Handler {
*/
protected Integer channelCode;
/**
*
*
*/
protected FlowControlParam flowControlParam;
@Autowired
private HandlerHolder handlerHolder;
@Autowired
private LogUtils logUtils;
@Autowired
private FlowControlService flowControlService;
/**
* Handler
@ -37,6 +48,7 @@ public abstract class BaseHandler implements Handler {
@Override
public void doHandler(TaskInfo taskInfo) {
flowControl(taskInfo);
if (handler(taskInfo)) {
logUtils.print(AnchorInfo.builder().state(AnchorState.SEND_SUCCESS.getCode()).businessId(taskInfo.getBusinessId()).ids(taskInfo.getReceiver()).build());
return;
@ -44,6 +56,19 @@ public abstract class BaseHandler implements Handler {
logUtils.print(AnchorInfo.builder().state(AnchorState.SEND_FAIL.getCode()).businessId(taskInfo.getBusinessId()).ids(taskInfo.getReceiver()).build());
}
/**
*
*
* @param taskInfo
*/
public void flowControl(TaskInfo taskInfo) {
// 只有子类指定了限流参数,才需要限流
if (flowControlParam != null) {
flowControlService.flowControl(taskInfo, flowControlParam);
}
}
/**
* handler
*
@ -52,4 +77,6 @@ public abstract class BaseHandler implements Handler {
*/
public abstract boolean handler(TaskInfo taskInfo);
}

@ -4,10 +4,13 @@ package com.java3y.austin.handler.handler.impl;
import cn.hutool.extra.mail.MailAccount;
import cn.hutool.extra.mail.MailUtil;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.RateLimiter;
import com.java3y.austin.common.constant.SendAccountConstant;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.common.dto.model.EmailContentModel;
import com.java3y.austin.common.enums.ChannelType;
import com.java3y.austin.handler.enums.RateLimitStrategy;
import com.java3y.austin.handler.flowcontrol.FlowControlParam;
import com.java3y.austin.handler.handler.BaseHandler;
import com.java3y.austin.handler.handler.Handler;
import com.java3y.austin.support.utils.AccountUtils;
@ -30,6 +33,13 @@ public class EmailHandler extends BaseHandler implements Handler {
public EmailHandler() {
channelCode = ChannelType.EMAIL.getCode();
// 按照请求限流,默认单机 3 qps 具体数值配置在apollo动态调整)
Double rateInitValue = Double.valueOf(3);
flowControlParam = FlowControlParam.builder().rateInitValue(rateInitValue)
.rateLimitStrategy(RateLimitStrategy.REQUEST_RATE_LIMIT)
.rateLimiter(RateLimiter.create(rateInitValue)).build();
}
@Override

Loading…
Cancel
Save