From 77c895c48a0a54d9d295c418b543d69421966c53 Mon Sep 17 00:00:00 2001 From: TOM Date: Thu, 21 Jul 2022 18:02:46 +0800 Subject: [PATCH 1/2] =?UTF-8?q?master:=E5=8D=95=E6=9C=BA=E9=99=90=E6=B5=81?= =?UTF-8?q?=E8=A7=A3=E8=80=A6=20=E4=BF=AE=E6=94=B9=E9=83=A8=E5=88=86?= =?UTF-8?q?=E4=BB=A3=E7=A0=81=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../cron/service/impl/TaskHandlerImpl.java | 6 +-- .../deduplication/DeduplicationHolder.java | 4 +- .../limit/SimpleLimitService.java | 4 +- ...rviceImpl.java => FlowControlFactory.java} | 53 +++++++++++++------ .../flowcontrol/FlowControlService.java | 2 +- .../annotations/LocalRateLimit.java | 22 ++++++++ .../impl/RequestRateLimitService.java | 28 ++++++++++ .../impl/SendUserNumRateLimitService.java | 28 ++++++++++ .../austin/handler/handler/BaseHandler.java | 11 ++-- .../shield/impl/ShieldServiceImpl.java | 2 +- 10 files changed, 129 insertions(+), 31 deletions(-) rename austin-handler/src/main/java/com/java3y/austin/handler/flowcontrol/{impl/FlowControlServiceImpl.java => FlowControlFactory.java} (52%) create mode 100644 austin-handler/src/main/java/com/java3y/austin/handler/flowcontrol/annotations/LocalRateLimit.java create mode 100644 austin-handler/src/main/java/com/java3y/austin/handler/flowcontrol/impl/RequestRateLimitService.java create mode 100644 austin-handler/src/main/java/com/java3y/austin/handler/flowcontrol/impl/SendUserNumRateLimitService.java diff --git a/austin-cron/src/main/java/com/java3y/austin/cron/service/impl/TaskHandlerImpl.java b/austin-cron/src/main/java/com/java3y/austin/cron/service/impl/TaskHandlerImpl.java index 9c52acb..1408fac 100644 --- a/austin-cron/src/main/java/com/java3y/austin/cron/service/impl/TaskHandlerImpl.java +++ b/austin-cron/src/main/java/com/java3y/austin/cron/service/impl/TaskHandlerImpl.java @@ -36,7 +36,7 @@ public class TaskHandlerImpl implements TaskHandler { public void handle(Long messageTemplateId) { MessageTemplate messageTemplate = messageTemplateDao.findById(messageTemplateId).get(); - if (messageTemplate == null || StrUtil.isBlank(messageTemplate.getCronCrowdPath())) { + if (StrUtil.isBlank(messageTemplate.getCronCrowdPath())) { log.error("TaskHandler#handle crowdPath empty! messageTemplateId:{}", messageTemplateId); return; } @@ -48,14 +48,14 @@ public class TaskHandlerImpl implements TaskHandler { CrowdBatchTaskPending crowdBatchTaskPending = context.getBean(CrowdBatchTaskPending.class); ReadFileUtils.getCsvRow(messageTemplate.getCronCrowdPath(), row -> { if (CollUtil.isEmpty(row.getFieldMap()) - || StrUtil.isBlank(row.getFieldMap().get(ReadFileUtils.RECEIVER_KEY))) { + || StrUtil.isBlank(row.getFieldMap().get(ReadFileUtils.RECEIVER_KEY))) { return; } // 3. 每一行处理交给LazyPending HashMap params = ReadFileUtils.getParamFromLine(row.getFieldMap()); CrowdInfoVo crowdInfoVo = CrowdInfoVo.builder().receiver(row.getFieldMap().get(ReadFileUtils.RECEIVER_KEY)) - .params(params).messageTemplateId(messageTemplateId).build(); + .params(params).messageTemplateId(messageTemplateId).build(); crowdBatchTaskPending.pending(crowdInfoVo); // 4. 判断是否读取文件完成回收资源且更改状态 diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/DeduplicationHolder.java b/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/DeduplicationHolder.java index 04902db..9afa064 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/DeduplicationHolder.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/DeduplicationHolder.java @@ -15,8 +15,8 @@ import java.util.Map; @Service public class DeduplicationHolder { - private Map builderHolder = new HashMap<>(4); - private Map serviceHolder = new HashMap<>(4); + private final Map builderHolder = new HashMap<>(4); + private final Map serviceHolder = new HashMap<>(4); public Builder selectBuilder(Integer key) { return builderHolder.get(key); diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/limit/SimpleLimitService.java b/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/limit/SimpleLimitService.java index 58f042b..c543a5f 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/limit/SimpleLimitService.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/limit/SimpleLimitService.java @@ -29,7 +29,7 @@ public class SimpleLimitService extends AbstractLimitService { public Set limitFilter(AbstractDeduplicationService service, TaskInfo taskInfo, DeduplicationParam param) { Set filterReceiver = new HashSet<>(taskInfo.getReceiver().size()); // 获取redis记录 - Map readyPutRedisReceiver = new HashMap(taskInfo.getReceiver().size()); + Map readyPutRedisReceiver = new HashMap<>(taskInfo.getReceiver().size()); //redis数据隔离 List keys = deduplicationAllKey(service, taskInfo).stream().map(key -> LIMIT_TAG + key).collect(Collectors.toList()); Map inRedisValue = redisUtils.mGet(keys); @@ -59,7 +59,7 @@ public class SimpleLimitService extends AbstractLimitService { * @param readyPutRedisReceiver */ private void putInRedis(Map readyPutRedisReceiver, - Map inRedisValue, Long deduplicationTime) { + Map inRedisValue, Long deduplicationTime) { Map keyValues = new HashMap<>(readyPutRedisReceiver.size()); for (Map.Entry entry : readyPutRedisReceiver.entrySet()) { String key = entry.getValue(); diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/flowcontrol/impl/FlowControlServiceImpl.java b/austin-handler/src/main/java/com/java3y/austin/handler/flowcontrol/FlowControlFactory.java similarity index 52% rename from austin-handler/src/main/java/com/java3y/austin/handler/flowcontrol/impl/FlowControlServiceImpl.java rename to austin-handler/src/main/java/com/java3y/austin/handler/flowcontrol/FlowControlFactory.java index 7c2e5c9..92b22da 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/flowcontrol/impl/FlowControlServiceImpl.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/flowcontrol/FlowControlFactory.java @@ -1,4 +1,4 @@ -package com.java3y.austin.handler.flowcontrol.impl; +package com.java3y.austin.handler.flowcontrol; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; @@ -7,11 +7,18 @@ import com.java3y.austin.common.constant.AustinConstant; import com.java3y.austin.common.domain.TaskInfo; import com.java3y.austin.common.enums.ChannelType; import com.java3y.austin.handler.enums.RateLimitStrategy; -import com.java3y.austin.handler.flowcontrol.FlowControlParam; -import com.java3y.austin.handler.flowcontrol.FlowControlService; +import com.java3y.austin.handler.flowcontrol.annotations.LocalRateLimit; import com.java3y.austin.support.service.ConfigService; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import javax.annotation.PostConstruct; import lombok.extern.slf4j.Slf4j; +import org.springframework.aop.support.AopUtils; +import org.springframework.beans.BeansException; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Service; /** @@ -20,22 +27,26 @@ import org.springframework.stereotype.Service; */ @Service @Slf4j -public class FlowControlServiceImpl implements FlowControlService { +public class FlowControlFactory implements ApplicationContextAware { private static final String FLOW_CONTROL_KEY = "flowControlRule"; private static final String FLOW_CONTROL_PREFIX = "flow_control_"; + private final Map flowControlServiceMap = new ConcurrentHashMap<>(); + @Autowired private ConfigService config; + private ApplicationContext applicationContext; @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + this.applicationContext = applicationContext; + } + public void flowControl(TaskInfo taskInfo, FlowControlParam flowControlParam) { - RateLimiter rateLimiter = flowControlParam.getRateLimiter(); + RateLimiter rateLimiter; Double rateInitValue = flowControlParam.getRateInitValue(); - - double costTime = 0; - // 对比 初始限流值 与 配置限流值,以 配置中心的限流值为准 Double rateLimitConfig = getRateLimitConfig(taskInfo.getSendChannel()); if (rateLimitConfig != null && !rateInitValue.equals(rateLimitConfig)) { @@ -43,16 +54,15 @@ public class FlowControlServiceImpl implements FlowControlService { flowControlParam.setRateInitValue(rateLimitConfig); flowControlParam.setRateLimiter(rateLimiter); } - if (RateLimitStrategy.REQUEST_RATE_LIMIT.equals(flowControlParam.getRateLimitStrategy())) { - costTime = rateLimiter.acquire(1); + FlowControlService flowControlService = flowControlServiceMap.get(flowControlParam.getRateLimitStrategy()); + if (Objects.isNull(flowControlService)) { + log.error("没有找到对应的单机限流策略"); + return; } - if (RateLimitStrategy.SEND_USER_NUM_RATE_LIMIT.equals(flowControlParam.getRateLimitStrategy())) { - costTime = rateLimiter.acquire(taskInfo.getReceiver().size()); - } - + double costTime = flowControlService.flowControl(taskInfo, flowControlParam); if (costTime > 0) { log.info("consumer {} flow control time {}", - ChannelType.getEnumByCode(taskInfo.getSendChannel()).getDescription(), costTime); + ChannelType.getEnumByCode(taskInfo.getSendChannel()).getDescription(), costTime); } } @@ -73,4 +83,17 @@ public class FlowControlServiceImpl implements FlowControlService { } return jsonObject.getDouble(FLOW_CONTROL_PREFIX + channelCode); } + + @PostConstruct + private void init() { + Map serviceMap = this.applicationContext.getBeansWithAnnotation(LocalRateLimit.class); + serviceMap.forEach((name, service) -> { + if (service instanceof FlowControlService) { + LocalRateLimit localRateLimit = AopUtils.getTargetClass(service).getAnnotation(LocalRateLimit.class); + RateLimitStrategy rateLimitStrategy = localRateLimit.rateLimitStrategy(); + //通常情况下 实现的限流service与rateLimitStrategy一一对应 + flowControlServiceMap.put(rateLimitStrategy, (FlowControlService) service); + } + }); + } } diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/flowcontrol/FlowControlService.java b/austin-handler/src/main/java/com/java3y/austin/handler/flowcontrol/FlowControlService.java index 8986c6a..f780a2b 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/flowcontrol/FlowControlService.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/flowcontrol/FlowControlService.java @@ -15,6 +15,6 @@ public interface FlowControlService { * @param taskInfo * @param flowControlParam */ - void flowControl(TaskInfo taskInfo, FlowControlParam flowControlParam); + Double flowControl(TaskInfo taskInfo, FlowControlParam flowControlParam); } diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/flowcontrol/annotations/LocalRateLimit.java b/austin-handler/src/main/java/com/java3y/austin/handler/flowcontrol/annotations/LocalRateLimit.java new file mode 100644 index 0000000..25a4145 --- /dev/null +++ b/austin-handler/src/main/java/com/java3y/austin/handler/flowcontrol/annotations/LocalRateLimit.java @@ -0,0 +1,22 @@ +package com.java3y.austin.handler.flowcontrol.annotations; + +import com.java3y.austin.handler.enums.RateLimitStrategy; +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; +import org.springframework.stereotype.Service; + +/** + * 单机限流注解 + * Created by TOM + * On 2022/7/21 17:03 + */ +@Target({ElementType.TYPE}) +@Retention(RetentionPolicy.RUNTIME) +@Documented +@Service +public @interface LocalRateLimit { + RateLimitStrategy rateLimitStrategy() default RateLimitStrategy.REQUEST_RATE_LIMIT; +} diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/flowcontrol/impl/RequestRateLimitService.java b/austin-handler/src/main/java/com/java3y/austin/handler/flowcontrol/impl/RequestRateLimitService.java new file mode 100644 index 0000000..d2279d7 --- /dev/null +++ b/austin-handler/src/main/java/com/java3y/austin/handler/flowcontrol/impl/RequestRateLimitService.java @@ -0,0 +1,28 @@ +package com.java3y.austin.handler.flowcontrol.impl; + +import com.google.common.util.concurrent.RateLimiter; +import com.java3y.austin.common.domain.TaskInfo; +import com.java3y.austin.handler.enums.RateLimitStrategy; +import com.java3y.austin.handler.flowcontrol.FlowControlParam; +import com.java3y.austin.handler.flowcontrol.FlowControlService; +import com.java3y.austin.handler.flowcontrol.annotations.LocalRateLimit; + +/** + * Created by TOM + * On 2022/7/21 17:05 + */ +@LocalRateLimit(rateLimitStrategy = RateLimitStrategy.REQUEST_RATE_LIMIT) +public class RequestRateLimitService implements FlowControlService { + + /** + * 根据渠道进行流量控制 + * + * @param taskInfo + * @param flowControlParam + */ + @Override + public Double flowControl(TaskInfo taskInfo, FlowControlParam flowControlParam) { + RateLimiter rateLimiter = flowControlParam.getRateLimiter(); + return rateLimiter.acquire(1); + } +} diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/flowcontrol/impl/SendUserNumRateLimitService.java b/austin-handler/src/main/java/com/java3y/austin/handler/flowcontrol/impl/SendUserNumRateLimitService.java new file mode 100644 index 0000000..08b6ca1 --- /dev/null +++ b/austin-handler/src/main/java/com/java3y/austin/handler/flowcontrol/impl/SendUserNumRateLimitService.java @@ -0,0 +1,28 @@ +package com.java3y.austin.handler.flowcontrol.impl; + +import com.google.common.util.concurrent.RateLimiter; +import com.java3y.austin.common.domain.TaskInfo; +import com.java3y.austin.handler.enums.RateLimitStrategy; +import com.java3y.austin.handler.flowcontrol.FlowControlParam; +import com.java3y.austin.handler.flowcontrol.FlowControlService; +import com.java3y.austin.handler.flowcontrol.annotations.LocalRateLimit; + +/** + * Created by TOM + * On 2022/7/21 17:14 + */ +@LocalRateLimit(rateLimitStrategy = RateLimitStrategy.SEND_USER_NUM_RATE_LIMIT) +public class SendUserNumRateLimitService implements FlowControlService { + + /** + * 根据渠道进行流量控制 + * + * @param taskInfo + * @param flowControlParam + */ + @Override + public Double flowControl(TaskInfo taskInfo, FlowControlParam flowControlParam) { + RateLimiter rateLimiter = flowControlParam.getRateLimiter(); + return rateLimiter.acquire(taskInfo.getReceiver().size()); + } +} diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/handler/BaseHandler.java b/austin-handler/src/main/java/com/java3y/austin/handler/handler/BaseHandler.java index 236d1f8..1e70ddb 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/handler/BaseHandler.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/handler/BaseHandler.java @@ -1,16 +1,13 @@ package com.java3y.austin.handler.handler; -import com.google.common.util.concurrent.RateLimiter; import com.java3y.austin.common.domain.AnchorInfo; import com.java3y.austin.common.domain.TaskInfo; import com.java3y.austin.common.enums.AnchorState; -import com.java3y.austin.handler.enums.RateLimitStrategy; +import com.java3y.austin.handler.flowcontrol.FlowControlFactory; import com.java3y.austin.handler.flowcontrol.FlowControlParam; -import com.java3y.austin.handler.flowcontrol.FlowControlService; import com.java3y.austin.support.utils.LogUtils; -import org.springframework.beans.factory.annotation.Autowired; - import javax.annotation.PostConstruct; +import org.springframework.beans.factory.annotation.Autowired; /** * @author 3y @@ -22,7 +19,7 @@ public abstract class BaseHandler implements Handler { @Autowired private LogUtils logUtils; @Autowired - private FlowControlService flowControlService; + private FlowControlFactory flowControlFactory; /** * 标识渠道的Code @@ -52,7 +49,7 @@ public abstract class BaseHandler implements Handler { public void flowControl(TaskInfo taskInfo) { // 只有子类指定了限流参数,才需要限流 if (flowControlParam != null) { - flowControlService.flowControl(taskInfo, flowControlParam); + flowControlFactory.flowControl(taskInfo, flowControlParam); } } @Override diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/shield/impl/ShieldServiceImpl.java b/austin-handler/src/main/java/com/java3y/austin/handler/shield/impl/ShieldServiceImpl.java index 6b8b0b0..1e5b7b7 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/shield/impl/ShieldServiceImpl.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/shield/impl/ShieldServiceImpl.java @@ -50,7 +50,7 @@ public class ShieldServiceImpl implements ShieldService { } if (ShieldType.NIGHT_SHIELD_BUT_NEXT_DAY_SEND.getCode().equals(taskInfo.getShieldType())) { redisUtils.lPush(NIGHT_SHIELD_BUT_NEXT_DAY_SEND_KEY, JSON.toJSONString(taskInfo, - new SerializerFeature[]{SerializerFeature.WriteClassName}), + SerializerFeature.WriteClassName), (DateUtil.offsetDay(new Date(), 1).getTime() / 1000) - DateUtil.currentSeconds()); logUtils.print(AnchorInfo.builder().state(AnchorState.NIGHT_SHIELD_NEXT_SEND.getCode()).businessId(taskInfo.getBusinessId()).ids(taskInfo.getReceiver()).build()); } From 5c160d47b45204add4c57b00b8e4a7a1b73169ca Mon Sep 17 00:00:00 2001 From: 3y Date: Wed, 27 Jul 2022 20:40:57 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E4=BC=98=E5=8C=96=E9=85=8D=E7=BD=AE?= =?UTF-8?q?=E4=BF=A1=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/main/resources/application.properties | 50 +++++++++++-------- 1 file changed, 28 insertions(+), 22 deletions(-) diff --git a/austin-web/src/main/resources/application.properties b/austin-web/src/main/resources/application.properties index 6284a79..4a1393a 100644 --- a/austin-web/src/main/resources/application.properties +++ b/austin-web/src/main/resources/application.properties @@ -2,32 +2,31 @@ # TODO please replace 【must】 config value # TODO please replace 【must】 config value - - # todo [database] ip/port/username/password 【must】 -austin-database-ip= -austin-database-port= -austin-database-username= -austin-database-password= +austin-database-ip=localhost +austin-database-port=3306 +austin-database-username=root +austin-database-password=root123_A # todo [redis] ip/port/password【must】 -austin-redis-ip= -austin-redis-port= -austin-redis-password= +austin-redis-ip=localhost +austin-redis-port=5003 +austin-redis-password=austin -# TODO kafka/eventbus -austin-mq-pipeline=eventbus +# TODO choose : kafka/eventBus/rocketMq/rabbitMq +austin-mq-pipeline=eventBus # todo [kafka] ip/port【optional】, if austin-mq-pipeline=kafka 【must】 austin-kafka-ip= austin-kafka-port= -# todo [rocketmq] 【optional】, if austin-mq-pipeline=rocketMq【must】 -austin-rocketmq-nameserver-ip=127.0.0.1 -austin-rocketmq-nameserver-port=9876 -austin-rocketmq-producer-group=unique-producer-group -austin-rocketmq-biz-consumer-group=unique-biz-consumer-group -austin-rocketmq-recall-consumer-group=unique-recall-consumer-group +# todo [rocketMq] 【optional】, if austin-mq-pipeline=rocketMq【must】 +austin-rocketmq-nameserver-ip= +austin-rocketmq-nameserver-port= + +# todo [rabbitMq] 【optional】, if austin-mq-pipeline=rabbitMq【must】 +austin-rabbitmq-ip= +austin-rabbitmq-port= # todo [xxl-job] switch/ip/port/【optional】 xxl-job.enabled=false @@ -62,16 +61,22 @@ spring.kafka.consumer.auto-commit-interval=1000 spring.kafka.consumer.enable-auto-commit=true ##################### rocketmq properties ##################### + rocketmq.name-server=${austin-rocketmq-nameserver-ip}:${austin-rocketmq-nameserver-port} -rocketmq.producer.group=${austin-rocketmq-producer-group} +rocketmq.producer.group=unique-producer-group +austin-rocketmq-biz-consumer-group=unique-biz-consumer-group +austin-rocketmq-recall-consumer-group=unique-recall-consumer-group + + ##################### Rabbit properties ##################### -server.port=8080 -spring.application.name=cl #RabbitMq所在服务器IP -spring.rabbitmq.host=127.0.0.1 +spring.rabbitmq.host=${austin-rabbitmq-ip} #连接端口号 -spring.rabbitmq.port=5672 +spring.rabbitmq.port=${austin-rabbitmq-port} + +server.port=8080 +spring.application.name=cl #用户名 spring.rabbitmq.username=root #用户密码 @@ -134,3 +139,4 @@ management.endpoint.metrics.enabled=true management.endpoint.prometheus.enabled=true management.endpoints.web.exposure.include=* management.metrics.export.prometheus.enabled=true +management.health.rabbit.enabled=false