merge master into vip

pull/13/head
3y 2 years ago
commit 55e778f798

@ -36,7 +36,7 @@ public class TaskHandlerImpl implements TaskHandler {
public void handle(Long messageTemplateId) { public void handle(Long messageTemplateId) {
MessageTemplate messageTemplate = messageTemplateDao.findById(messageTemplateId).get(); MessageTemplate messageTemplate = messageTemplateDao.findById(messageTemplateId).get();
if (messageTemplate == null || StrUtil.isBlank(messageTemplate.getCronCrowdPath())) { if (StrUtil.isBlank(messageTemplate.getCronCrowdPath())) {
log.error("TaskHandler#handle crowdPath empty! messageTemplateId:{}", messageTemplateId); log.error("TaskHandler#handle crowdPath empty! messageTemplateId:{}", messageTemplateId);
return; return;
} }
@ -48,14 +48,14 @@ public class TaskHandlerImpl implements TaskHandler {
CrowdBatchTaskPending crowdBatchTaskPending = context.getBean(CrowdBatchTaskPending.class); CrowdBatchTaskPending crowdBatchTaskPending = context.getBean(CrowdBatchTaskPending.class);
ReadFileUtils.getCsvRow(messageTemplate.getCronCrowdPath(), row -> { ReadFileUtils.getCsvRow(messageTemplate.getCronCrowdPath(), row -> {
if (CollUtil.isEmpty(row.getFieldMap()) if (CollUtil.isEmpty(row.getFieldMap())
|| StrUtil.isBlank(row.getFieldMap().get(ReadFileUtils.RECEIVER_KEY))) { || StrUtil.isBlank(row.getFieldMap().get(ReadFileUtils.RECEIVER_KEY))) {
return; return;
} }
// 3. 每一行处理交给LazyPending // 3. 每一行处理交给LazyPending
HashMap<String, String> params = ReadFileUtils.getParamFromLine(row.getFieldMap()); HashMap<String, String> params = ReadFileUtils.getParamFromLine(row.getFieldMap());
CrowdInfoVo crowdInfoVo = CrowdInfoVo.builder().receiver(row.getFieldMap().get(ReadFileUtils.RECEIVER_KEY)) CrowdInfoVo crowdInfoVo = CrowdInfoVo.builder().receiver(row.getFieldMap().get(ReadFileUtils.RECEIVER_KEY))
.params(params).messageTemplateId(messageTemplateId).build(); .params(params).messageTemplateId(messageTemplateId).build();
crowdBatchTaskPending.pending(crowdInfoVo); crowdBatchTaskPending.pending(crowdInfoVo);
// 4. 判断是否读取文件完成回收资源且更改状态 // 4. 判断是否读取文件完成回收资源且更改状态

@ -15,8 +15,8 @@ import java.util.Map;
@Service @Service
public class DeduplicationHolder { public class DeduplicationHolder {
private Map<Integer, Builder> builderHolder = new HashMap<>(4); private final Map<Integer, Builder> builderHolder = new HashMap<>(4);
private Map<Integer, DeduplicationService> serviceHolder = new HashMap<>(4); private final Map<Integer, DeduplicationService> serviceHolder = new HashMap<>(4);
public Builder selectBuilder(Integer key) { public Builder selectBuilder(Integer key) {
return builderHolder.get(key); return builderHolder.get(key);

@ -29,7 +29,7 @@ public class SimpleLimitService extends AbstractLimitService {
public Set<String> limitFilter(AbstractDeduplicationService service, TaskInfo taskInfo, DeduplicationParam param) { public Set<String> limitFilter(AbstractDeduplicationService service, TaskInfo taskInfo, DeduplicationParam param) {
Set<String> filterReceiver = new HashSet<>(taskInfo.getReceiver().size()); Set<String> filterReceiver = new HashSet<>(taskInfo.getReceiver().size());
// 获取redis记录 // 获取redis记录
Map<String, String> readyPutRedisReceiver = new HashMap(taskInfo.getReceiver().size()); Map<String, String> readyPutRedisReceiver = new HashMap<>(taskInfo.getReceiver().size());
//redis数据隔离 //redis数据隔离
List<String> keys = deduplicationAllKey(service, taskInfo).stream().map(key -> LIMIT_TAG + key).collect(Collectors.toList()); List<String> keys = deduplicationAllKey(service, taskInfo).stream().map(key -> LIMIT_TAG + key).collect(Collectors.toList());
Map<String, String> inRedisValue = redisUtils.mGet(keys); Map<String, String> inRedisValue = redisUtils.mGet(keys);
@ -59,7 +59,7 @@ public class SimpleLimitService extends AbstractLimitService {
* @param readyPutRedisReceiver * @param readyPutRedisReceiver
*/ */
private void putInRedis(Map<String, String> readyPutRedisReceiver, private void putInRedis(Map<String, String> readyPutRedisReceiver,
Map<String, String> inRedisValue, Long deduplicationTime) { Map<String, String> inRedisValue, Long deduplicationTime) {
Map<String, String> keyValues = new HashMap<>(readyPutRedisReceiver.size()); Map<String, String> keyValues = new HashMap<>(readyPutRedisReceiver.size());
for (Map.Entry<String, String> entry : readyPutRedisReceiver.entrySet()) { for (Map.Entry<String, String> entry : readyPutRedisReceiver.entrySet()) {
String key = entry.getValue(); String key = entry.getValue();

@ -1,4 +1,4 @@
package com.java3y.austin.handler.flowcontrol.impl; package com.java3y.austin.handler.flowcontrol;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
@ -7,11 +7,18 @@ import com.java3y.austin.common.constant.AustinConstant;
import com.java3y.austin.common.domain.TaskInfo; import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.common.enums.ChannelType; import com.java3y.austin.common.enums.ChannelType;
import com.java3y.austin.handler.enums.RateLimitStrategy; import com.java3y.austin.handler.enums.RateLimitStrategy;
import com.java3y.austin.handler.flowcontrol.FlowControlParam; import com.java3y.austin.handler.flowcontrol.annotations.LocalRateLimit;
import com.java3y.austin.handler.flowcontrol.FlowControlService;
import com.java3y.austin.support.service.ConfigService; import com.java3y.austin.support.service.ConfigService;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
/** /**
@ -20,22 +27,26 @@ import org.springframework.stereotype.Service;
*/ */
@Service @Service
@Slf4j @Slf4j
public class FlowControlServiceImpl implements FlowControlService { public class FlowControlFactory implements ApplicationContextAware {
private static final String FLOW_CONTROL_KEY = "flowControlRule"; private static final String FLOW_CONTROL_KEY = "flowControlRule";
private static final String FLOW_CONTROL_PREFIX = "flow_control_"; private static final String FLOW_CONTROL_PREFIX = "flow_control_";
private final Map<RateLimitStrategy, FlowControlService> flowControlServiceMap = new ConcurrentHashMap<>();
@Autowired @Autowired
private ConfigService config; private ConfigService config;
private ApplicationContext applicationContext;
@Override @Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
public void flowControl(TaskInfo taskInfo, FlowControlParam flowControlParam) { public void flowControl(TaskInfo taskInfo, FlowControlParam flowControlParam) {
RateLimiter rateLimiter = flowControlParam.getRateLimiter(); RateLimiter rateLimiter;
Double rateInitValue = flowControlParam.getRateInitValue(); Double rateInitValue = flowControlParam.getRateInitValue();
double costTime = 0;
// 对比 初始限流值 与 配置限流值,以 配置中心的限流值为准 // 对比 初始限流值 与 配置限流值,以 配置中心的限流值为准
Double rateLimitConfig = getRateLimitConfig(taskInfo.getSendChannel()); Double rateLimitConfig = getRateLimitConfig(taskInfo.getSendChannel());
if (rateLimitConfig != null && !rateInitValue.equals(rateLimitConfig)) { if (rateLimitConfig != null && !rateInitValue.equals(rateLimitConfig)) {
@ -43,16 +54,15 @@ public class FlowControlServiceImpl implements FlowControlService {
flowControlParam.setRateInitValue(rateLimitConfig); flowControlParam.setRateInitValue(rateLimitConfig);
flowControlParam.setRateLimiter(rateLimiter); flowControlParam.setRateLimiter(rateLimiter);
} }
if (RateLimitStrategy.REQUEST_RATE_LIMIT.equals(flowControlParam.getRateLimitStrategy())) { FlowControlService flowControlService = flowControlServiceMap.get(flowControlParam.getRateLimitStrategy());
costTime = rateLimiter.acquire(1); if (Objects.isNull(flowControlService)) {
log.error("没有找到对应的单机限流策略");
return;
} }
if (RateLimitStrategy.SEND_USER_NUM_RATE_LIMIT.equals(flowControlParam.getRateLimitStrategy())) { double costTime = flowControlService.flowControl(taskInfo, flowControlParam);
costTime = rateLimiter.acquire(taskInfo.getReceiver().size());
}
if (costTime > 0) { if (costTime > 0) {
log.info("consumer {} flow control time {}", log.info("consumer {} flow control time {}",
ChannelType.getEnumByCode(taskInfo.getSendChannel()).getDescription(), costTime); ChannelType.getEnumByCode(taskInfo.getSendChannel()).getDescription(), costTime);
} }
} }
@ -73,4 +83,17 @@ public class FlowControlServiceImpl implements FlowControlService {
} }
return jsonObject.getDouble(FLOW_CONTROL_PREFIX + channelCode); return jsonObject.getDouble(FLOW_CONTROL_PREFIX + channelCode);
} }
@PostConstruct
private void init() {
Map<String, Object> serviceMap = this.applicationContext.getBeansWithAnnotation(LocalRateLimit.class);
serviceMap.forEach((name, service) -> {
if (service instanceof FlowControlService) {
LocalRateLimit localRateLimit = AopUtils.getTargetClass(service).getAnnotation(LocalRateLimit.class);
RateLimitStrategy rateLimitStrategy = localRateLimit.rateLimitStrategy();
//通常情况下 实现的限流service与rateLimitStrategy一一对应
flowControlServiceMap.put(rateLimitStrategy, (FlowControlService) service);
}
});
}
} }

@ -15,6 +15,6 @@ public interface FlowControlService {
* @param taskInfo * @param taskInfo
* @param flowControlParam * @param flowControlParam
*/ */
void flowControl(TaskInfo taskInfo, FlowControlParam flowControlParam); Double flowControl(TaskInfo taskInfo, FlowControlParam flowControlParam);
} }

@ -0,0 +1,22 @@
package com.java3y.austin.handler.flowcontrol.annotations;
import com.java3y.austin.handler.enums.RateLimitStrategy;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.springframework.stereotype.Service;
/**
*
* Created by TOM
* On 2022/7/21 17:03
*/
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Service
public @interface LocalRateLimit {
RateLimitStrategy rateLimitStrategy() default RateLimitStrategy.REQUEST_RATE_LIMIT;
}

@ -0,0 +1,28 @@
package com.java3y.austin.handler.flowcontrol.impl;
import com.google.common.util.concurrent.RateLimiter;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.handler.enums.RateLimitStrategy;
import com.java3y.austin.handler.flowcontrol.FlowControlParam;
import com.java3y.austin.handler.flowcontrol.FlowControlService;
import com.java3y.austin.handler.flowcontrol.annotations.LocalRateLimit;
/**
* Created by TOM
* On 2022/7/21 17:05
*/
@LocalRateLimit(rateLimitStrategy = RateLimitStrategy.REQUEST_RATE_LIMIT)
public class RequestRateLimitService implements FlowControlService {
/**
*
*
* @param taskInfo
* @param flowControlParam
*/
@Override
public Double flowControl(TaskInfo taskInfo, FlowControlParam flowControlParam) {
RateLimiter rateLimiter = flowControlParam.getRateLimiter();
return rateLimiter.acquire(1);
}
}

@ -0,0 +1,28 @@
package com.java3y.austin.handler.flowcontrol.impl;
import com.google.common.util.concurrent.RateLimiter;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.handler.enums.RateLimitStrategy;
import com.java3y.austin.handler.flowcontrol.FlowControlParam;
import com.java3y.austin.handler.flowcontrol.FlowControlService;
import com.java3y.austin.handler.flowcontrol.annotations.LocalRateLimit;
/**
* Created by TOM
* On 2022/7/21 17:14
*/
@LocalRateLimit(rateLimitStrategy = RateLimitStrategy.SEND_USER_NUM_RATE_LIMIT)
public class SendUserNumRateLimitService implements FlowControlService {
/**
*
*
* @param taskInfo
* @param flowControlParam
*/
@Override
public Double flowControl(TaskInfo taskInfo, FlowControlParam flowControlParam) {
RateLimiter rateLimiter = flowControlParam.getRateLimiter();
return rateLimiter.acquire(taskInfo.getReceiver().size());
}
}

@ -1,16 +1,13 @@
package com.java3y.austin.handler.handler; package com.java3y.austin.handler.handler;
import com.google.common.util.concurrent.RateLimiter;
import com.java3y.austin.common.domain.AnchorInfo; import com.java3y.austin.common.domain.AnchorInfo;
import com.java3y.austin.common.domain.TaskInfo; import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.common.enums.AnchorState; import com.java3y.austin.common.enums.AnchorState;
import com.java3y.austin.handler.enums.RateLimitStrategy; import com.java3y.austin.handler.flowcontrol.FlowControlFactory;
import com.java3y.austin.handler.flowcontrol.FlowControlParam; import com.java3y.austin.handler.flowcontrol.FlowControlParam;
import com.java3y.austin.handler.flowcontrol.FlowControlService;
import com.java3y.austin.support.utils.LogUtils; import com.java3y.austin.support.utils.LogUtils;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Autowired;
/** /**
* @author 3y * @author 3y
@ -22,7 +19,7 @@ public abstract class BaseHandler implements Handler {
@Autowired @Autowired
private LogUtils logUtils; private LogUtils logUtils;
@Autowired @Autowired
private FlowControlService flowControlService; private FlowControlFactory flowControlFactory;
/** /**
* Code * Code
@ -52,7 +49,7 @@ public abstract class BaseHandler implements Handler {
public void flowControl(TaskInfo taskInfo) { public void flowControl(TaskInfo taskInfo) {
// 只有子类指定了限流参数,才需要限流 // 只有子类指定了限流参数,才需要限流
if (flowControlParam != null) { if (flowControlParam != null) {
flowControlService.flowControl(taskInfo, flowControlParam); flowControlFactory.flowControl(taskInfo, flowControlParam);
} }
} }
@Override @Override

@ -50,7 +50,7 @@ public class ShieldServiceImpl implements ShieldService {
} }
if (ShieldType.NIGHT_SHIELD_BUT_NEXT_DAY_SEND.getCode().equals(taskInfo.getShieldType())) { if (ShieldType.NIGHT_SHIELD_BUT_NEXT_DAY_SEND.getCode().equals(taskInfo.getShieldType())) {
redisUtils.lPush(NIGHT_SHIELD_BUT_NEXT_DAY_SEND_KEY, JSON.toJSONString(taskInfo, redisUtils.lPush(NIGHT_SHIELD_BUT_NEXT_DAY_SEND_KEY, JSON.toJSONString(taskInfo,
new SerializerFeature[]{SerializerFeature.WriteClassName}), SerializerFeature.WriteClassName),
(DateUtil.offsetDay(new Date(), 1).getTime() / 1000) - DateUtil.currentSeconds()); (DateUtil.offsetDay(new Date(), 1).getTime() / 1000) - DateUtil.currentSeconds());
logUtils.print(AnchorInfo.builder().state(AnchorState.NIGHT_SHIELD_NEXT_SEND.getCode()).businessId(taskInfo.getBusinessId()).ids(taskInfo.getReceiver()).build()); logUtils.print(AnchorInfo.builder().state(AnchorState.NIGHT_SHIELD_NEXT_SEND.getCode()).businessId(taskInfo.getBusinessId()).ids(taskInfo.getReceiver()).build());
} }

@ -13,19 +13,20 @@ austin-redis-ip=austin.redis
austin-redis-port=5003 austin-redis-port=5003
austin-redis-password=austin austin-redis-password=austin
# TODO kafka/eventbus # TODO choose : kafka/eventBus/rocketMq/rabbitMq
austin-mq-pipeline=kafka austin-mq-pipeline=kafka
# todo [kafka] ip/port【optional】, if austin-mq-pipeline=kafka 【must】 # todo [kafka] ip/port【optional】, if austin-mq-pipeline=kafka 【must】
austin-kafka-ip=austin.kafka austin-kafka-ip=austin.kafka
austin-kafka-port=9092 austin-kafka-port=9092
# todo [rocketmq] 【optional】, if austin-mq-pipeline=rocketMq【must】 # todo [rocketMq] 【optional】, if austin-mq-pipeline=rocketMq【must】
austin-rocketmq-nameserver-ip=127.0.0.1 austin-rocketmq-nameserver-ip=
austin-rocketmq-nameserver-port=9876 austin-rocketmq-nameserver-port=
austin-rocketmq-producer-group=unique-producer-group
austin-rocketmq-biz-consumer-group=unique-biz-consumer-group # todo [rabbitMq] 【optional】, if austin-mq-pipeline=rabbitMq【must】
austin-rocketmq-recall-consumer-group=unique-recall-consumer-group austin-rabbitmq-ip=
austin-rabbitmq-port=
# todo [xxl-job] switch/ip/port/【optional】 # todo [xxl-job] switch/ip/port/【optional】
xxl-job.enabled=false xxl-job.enabled=false
@ -60,16 +61,22 @@ spring.kafka.consumer.auto-commit-interval=1000
spring.kafka.consumer.enable-auto-commit=true spring.kafka.consumer.enable-auto-commit=true
##################### rocketmq properties ##################### ##################### rocketmq properties #####################
rocketmq.name-server=${austin-rocketmq-nameserver-ip}:${austin-rocketmq-nameserver-port} rocketmq.name-server=${austin-rocketmq-nameserver-ip}:${austin-rocketmq-nameserver-port}
rocketmq.producer.group=${austin-rocketmq-producer-group} rocketmq.producer.group=unique-producer-group
austin-rocketmq-biz-consumer-group=unique-biz-consumer-group
austin-rocketmq-recall-consumer-group=unique-recall-consumer-group
##################### Rabbit properties ##################### ##################### Rabbit properties #####################
server.port=8080
spring.application.name=cl
#RabbitMq所在服务器IP #RabbitMq所在服务器IP
spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.host=${austin-rabbitmq-ip}
#连接端口号 #连接端口号
spring.rabbitmq.port=5672 spring.rabbitmq.port=${austin-rabbitmq-port}
server.port=8080
spring.application.name=cl
#用户名 #用户名
spring.rabbitmq.username=root spring.rabbitmq.username=root
#用户密码 #用户密码
@ -132,3 +139,4 @@ management.endpoint.metrics.enabled=true
management.endpoint.prometheus.enabled=true management.endpoint.prometheus.enabled=true
management.endpoints.web.exposure.include=* management.endpoints.web.exposure.include=*
management.metrics.export.prometheus.enabled=true management.metrics.export.prometheus.enabled=true
management.health.rabbit.enabled=false

Loading…
Cancel
Save