diff --git a/README.md b/README.md index 9b7a3bf..75297c7 100644 --- a/README.md +++ b/README.md @@ -122,18 +122,17 @@ curl -XPOST "127.0.0.1:8080/send" -H 'Content-Type: application/json' -d '{"co - [x] 企业微信渠道接入 - [x] 夜间屏蔽次日早晨推送(xxl-job定时任务框架,另类的延时队列) - [x] 钉钉渠道接入 -- [ ] 编写单测 -- [ ] 持续提高消息推送系统的影响力,让更多的业务方了解其功能,进而挖掘更多拉新和唤醒用户的玩法,提高站内的次留率和转化率 -- [ ] 优化代码 -- [ ] 接入微信服务号渠道 -- [ ] 接入微信小程序渠道 -- [ ] 接入PUSH渠道 -- [ ] 接入工作流引擎实现对消息工单审核 +- [x] 单机限流实现 +- [x] 引入单测框架,编写部分单测用例 +- [x] 接入微信服务号渠道(已有pull request代码) +- [x] 接入微信小程序渠道(已有pull request代码) +- [x] 接入PUSH渠道 +- [ ] 总体架构已完成,持续做基础建设和优化代码 -**近期更新时间**:2022年3月30日 +**近期更新时间**:5月9号 -**近期更新功能**:钉钉群自定义机器人与工作消息渠道接入完成 +**近期更新功能**:接入个推PUSH,安卓发送推送消息 ## 项目交流 diff --git a/austin-common/src/main/java/com/java3y/austin/common/constant/SendAccountConstant.java b/austin-common/src/main/java/com/java3y/austin/common/constant/SendAccountConstant.java index f5aa376..5f3c807 100644 --- a/austin-common/src/main/java/com/java3y/austin/common/constant/SendAccountConstant.java +++ b/austin-common/src/main/java/com/java3y/austin/common/constant/SendAccountConstant.java @@ -12,6 +12,13 @@ package com.java3y.austin.common.constant; */ public class SendAccountConstant { + /** + * 账号约定:所有的账号都从10开始,步长为10 + */ + public static final Integer START = 10; + public static final Integer STEP = 10; + + /** * 钉钉 工作应用消息 账号 */ @@ -19,6 +26,14 @@ public class SendAccountConstant { public static final String DING_DING_WORK_NOTICE_PREFIX = "ding_ding_work_notice_"; public static final String DING_DING_ACCESS_TOKEN_PREFIX = "ding_ding_access_token_"; + /** + * 个推PUSH 消息账号 + */ + public static final String GE_TUI_ACCOUNT_KEY = "geTuiAccount"; + public static final String GE_TUI_ACCOUNT_PREFIX = "ge_tui_account_"; + public static final String GE_TUI_ACCESS_TOKEN_PREFIX = "ge_tui_access_token_"; + + /** * 邮件 账号 @@ -39,6 +54,17 @@ public class SendAccountConstant { public static final String ENTERPRISE_WECHAT_ACCOUNT_KEY = "enterpriseWechatAccount"; public static final String ENTERPRISE_WECHAT_PREFIX = "enterprise_wechat_"; + /** + * 微信服务号 应用消息 账号 + */ + public static final String WECHAT_OFFICIAL_ACCOUNT_KEY = "officialAccount"; + public static final String WECHAT_OFFICIAL__PREFIX = "official_"; + + /** + * 微信小程序 应用消息 账号 + */ + public static final String WECHAT_MINI_PROGRAM_ACCOUNT_KEY = "miniProgramAccount"; + public static final String WECHAT_MINI_PROGRAM_PREFIX = "mini_program_"; /** * 短信 账号 @@ -46,9 +72,5 @@ public class SendAccountConstant { public static final String SMS_ACCOUNT_KEY = "smsAccount"; public static final String SMS_PREFIX = "sms_"; - /** - * 账号约定:所有的账号都从10开始,步长为10 - */ - public static final Integer START = 10; - public static final Integer STEP = 10; + } diff --git a/austin-common/src/main/java/com/java3y/austin/common/dto/account/GeTuiAccount.java b/austin-common/src/main/java/com/java3y/austin/common/dto/account/GeTuiAccount.java new file mode 100644 index 0000000..994fbfe --- /dev/null +++ b/austin-common/src/main/java/com/java3y/austin/common/dto/account/GeTuiAccount.java @@ -0,0 +1,30 @@ +package com.java3y.austin.common.dto.account; + + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + + +/** + * 创建个推账号时的元信息 + * + * @author 3y + *

+ * (在调用个推的api时需要用到部分的参数) + *

+ * https://docs.getui.com/getui/start/devcenter/ + */ +@Data +@Builder +@AllArgsConstructor +@NoArgsConstructor +public class GeTuiAccount { + + private String appId; + + private String appKey; + + private String masterSecret; +} diff --git a/austin-common/src/main/java/com/java3y/austin/common/dto/account/WeChatMiniProgramAccount.java b/austin-common/src/main/java/com/java3y/austin/common/dto/account/WeChatMiniProgramAccount.java new file mode 100644 index 0000000..1e8d4e5 --- /dev/null +++ b/austin-common/src/main/java/com/java3y/austin/common/dto/account/WeChatMiniProgramAccount.java @@ -0,0 +1,44 @@ +package com.java3y.austin.common.dto.account; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * + * 小程序订阅消息参数 + *

+ * 参数示例: + * https://developers.weixin.qq.com/miniprogram/dev/api-backend/open-api/subscribe-message/subscribeMessage.send.html + * * @author sunql + */ +@Data +@Builder +@AllArgsConstructor +@NoArgsConstructor +public class WeChatMiniProgramAccount { + + /** + * 订阅消息模板ID + */ + private String templateId; + + /** + * 跳转小程序类型:developer为开发版;trial为体验版;formal为正式版;默认为正式版 + */ + private String miniProgramState; + + /** + * 击模板卡片后的跳转页面,仅限本小程序内的页面。支持带参数,(示例index?foo=bar)。该字段不填则模板无跳转。 + */ + private String page; + + /** + * 账号相关 + */ + private String appId; + private String appSecret; + private String grantType; + +} diff --git a/austin-common/src/main/java/com/java3y/austin/common/dto/account/WechatOfficialAccount.java b/austin-common/src/main/java/com/java3y/austin/common/dto/account/WeChatOfficialAccount.java similarity index 74% rename from austin-common/src/main/java/com/java3y/austin/common/dto/account/WechatOfficialAccount.java rename to austin-common/src/main/java/com/java3y/austin/common/dto/account/WeChatOfficialAccount.java index 1dfecc0..3f1a1c6 100644 --- a/austin-common/src/main/java/com/java3y/austin/common/dto/account/WechatOfficialAccount.java +++ b/austin-common/src/main/java/com/java3y/austin/common/dto/account/WeChatOfficialAccount.java @@ -19,17 +19,7 @@ import java.util.Map; @Builder @AllArgsConstructor @NoArgsConstructor -public class WechatOfficialAccount { - - /** - * 服务号关注者的openId - */ - private String openId; - - /** - * 需要使用的模板信息Id - */ - private String templateId; +public class WeChatOfficialAccount { /** * 模板消息跳转的url @@ -47,7 +37,9 @@ public class WechatOfficialAccount { private String path; /** - * 模板消息的信息载体 + * 账号相关 */ - private Map map; + private String appId; + private String secret; + private String templateId; } diff --git a/austin-common/src/main/java/com/java3y/austin/common/dto/model/MiniProgramContentModel.java b/austin-common/src/main/java/com/java3y/austin/common/dto/model/MiniProgramContentModel.java index 4601781..5e15ed3 100644 --- a/austin-common/src/main/java/com/java3y/austin/common/dto/model/MiniProgramContentModel.java +++ b/austin-common/src/main/java/com/java3y/austin/common/dto/model/MiniProgramContentModel.java @@ -1,8 +1,22 @@ package com.java3y.austin.common.dto.model; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.Map; + /** * @author 3y */ +@Data +@Builder +@AllArgsConstructor +@NoArgsConstructor public class MiniProgramContentModel extends ContentModel { - + /** + * 模板消息发送的数据 + */ + Map map; } diff --git a/austin-common/src/main/java/com/java3y/austin/common/dto/model/PushContentModel.java b/austin-common/src/main/java/com/java3y/austin/common/dto/model/PushContentModel.java index abb427d..511d436 100644 --- a/austin-common/src/main/java/com/java3y/austin/common/dto/model/PushContentModel.java +++ b/austin-common/src/main/java/com/java3y/austin/common/dto/model/PushContentModel.java @@ -1,9 +1,23 @@ package com.java3y.austin.common.dto.model; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + /** * @author 3y + * + * 通知栏消息推送 */ +@Data +@Builder +@AllArgsConstructor +@NoArgsConstructor public class PushContentModel extends ContentModel { + private String title; + private String content; + private String url; } diff --git a/austin-cron/src/main/java/com/java3y/austin/cron/dto/getui/GeTuiTokenResultDTO.java b/austin-cron/src/main/java/com/java3y/austin/cron/dto/getui/GeTuiTokenResultDTO.java new file mode 100644 index 0000000..ef8de29 --- /dev/null +++ b/austin-cron/src/main/java/com/java3y/austin/cron/dto/getui/GeTuiTokenResultDTO.java @@ -0,0 +1,37 @@ +package com.java3y.austin.cron.dto.getui; + +import com.alibaba.fastjson.annotation.JSONField; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * @author 3y + * @date 2022/5/8 + *

+ * https://docs.getui.com/getui/server/rest_v2/token/ + */ +@NoArgsConstructor +@Data +@AllArgsConstructor +@Builder +public class GeTuiTokenResultDTO { + + + @JSONField(name = "msg") + private String msg; + @JSONField(name = "code") + private Integer code; + @JSONField(name = "data") + private DataDTO data; + + @NoArgsConstructor + @Data + public static class DataDTO { + @JSONField(name = "expire_time") + private String expireTime; + @JSONField(name = "token") + private String token; + } +} diff --git a/austin-cron/src/main/java/com/java3y/austin/cron/dto/getui/QueryTokenParamDTO.java b/austin-cron/src/main/java/com/java3y/austin/cron/dto/getui/QueryTokenParamDTO.java new file mode 100644 index 0000000..6a1a848 --- /dev/null +++ b/austin-cron/src/main/java/com/java3y/austin/cron/dto/getui/QueryTokenParamDTO.java @@ -0,0 +1,40 @@ +package com.java3y.austin.cron.dto.getui; + +import cn.hutool.crypto.SecureUtil; +import cn.hutool.http.ContentType; +import cn.hutool.http.Header; +import cn.hutool.http.HttpRequest; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.annotation.JSONField; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + + +/** + * 请求token时的参数 + * @author 3y + * https://docs.getui.com/getui/server/rest_v2/token/ + */ +@NoArgsConstructor +@Data +@Builder +@AllArgsConstructor +public class QueryTokenParamDTO { + /** + * sign + */ + @JSONField(name = "sign") + private String sign; + /** + * timestamp + */ + @JSONField(name = "timestamp") + private String timestamp; + /** + * appkey + */ + @JSONField(name = "appkey") + private String appKey; +} diff --git a/austin-cron/src/main/java/com/java3y/austin/cron/handler/RefreshDingDingAccessTokenHandler.java b/austin-cron/src/main/java/com/java3y/austin/cron/handler/RefreshDingDingAccessTokenHandler.java index f2ce549..25f378e 100644 --- a/austin-cron/src/main/java/com/java3y/austin/cron/handler/RefreshDingDingAccessTokenHandler.java +++ b/austin-cron/src/main/java/com/java3y/austin/cron/handler/RefreshDingDingAccessTokenHandler.java @@ -46,7 +46,7 @@ public class RefreshDingDingAccessTokenHandler { log.info("refreshAccessTokenJob#execute!"); SupportThreadPoolConfig.getPendingSingleThreadPool().execute(() -> { for (int index = SendAccountConstant.START; true; index = index + SendAccountConstant.STEP) { - DingDingWorkNoticeAccount account = accountUtils.getAccount(index, SendAccountConstant.DING_DING_WORK_NOTICE_ACCOUNT_KEY, SendAccountConstant.DING_DING_WORK_NOTICE_PREFIX, new DingDingWorkNoticeAccount()); + DingDingWorkNoticeAccount account = accountUtils.getAccount(index, SendAccountConstant.DING_DING_WORK_NOTICE_ACCOUNT_KEY, SendAccountConstant.DING_DING_WORK_NOTICE_PREFIX, DingDingWorkNoticeAccount.class); if (account == null) { break; } diff --git a/austin-cron/src/main/java/com/java3y/austin/cron/handler/RefreshGeTuiAccessTokenHandler.java b/austin-cron/src/main/java/com/java3y/austin/cron/handler/RefreshGeTuiAccessTokenHandler.java new file mode 100644 index 0000000..b000df9 --- /dev/null +++ b/austin-cron/src/main/java/com/java3y/austin/cron/handler/RefreshGeTuiAccessTokenHandler.java @@ -0,0 +1,91 @@ +package com.java3y.austin.cron.handler; + +import cn.hutool.core.util.StrUtil; +import cn.hutool.crypto.SecureUtil; +import cn.hutool.http.ContentType; +import cn.hutool.http.Header; +import cn.hutool.http.HttpRequest; +import com.alibaba.fastjson.JSON; +import com.google.common.base.Throwables; +import com.java3y.austin.common.constant.SendAccountConstant; +import com.java3y.austin.common.dto.account.GeTuiAccount; +import com.java3y.austin.cron.dto.getui.QueryTokenParamDTO; +import com.java3y.austin.cron.dto.getui.GeTuiTokenResultDTO; +import com.java3y.austin.support.config.SupportThreadPoolConfig; +import com.java3y.austin.support.utils.AccountUtils; +import com.xxl.job.core.handler.annotation.XxlJob; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.stereotype.Service; + + +/** + * 刷新个推的token + *

+ * https://docs.getui.com/getui/server/rest_v2/token/ + * + * @author 3y + */ +@Service +@Slf4j +public class RefreshGeTuiAccessTokenHandler { + + @Autowired + private StringRedisTemplate redisTemplate; + + @Autowired + private AccountUtils accountUtils; + + /** + * 每小时请求一次接口刷新(以防失效) + */ + @XxlJob("refreshGeTuiAccessTokenJob") + public void execute() { + log.info("refreshGeTuiAccessTokenJob#execute!"); + SupportThreadPoolConfig.getPendingSingleThreadPool().execute(() -> { + for (int index = SendAccountConstant.START; true; index = index + SendAccountConstant.STEP) { + GeTuiAccount account = accountUtils.getAccount(index, SendAccountConstant.GE_TUI_ACCOUNT_KEY, SendAccountConstant.GE_TUI_ACCOUNT_PREFIX, GeTuiAccount.class); + if (account == null) { + break; + } + String accessToken = getAccessToken(account); + if (StrUtil.isNotBlank(accessToken)) { + redisTemplate.opsForValue().set(SendAccountConstant.GE_TUI_ACCESS_TOKEN_PREFIX + index, accessToken); + } + } + }); + } + + /** + * 获取 access_token + * + * @param account + * @return + */ + private String getAccessToken(GeTuiAccount account) { + String accessToken = ""; + try { + String url = "https://restapi.getui.com/v2/" + account.getAppId() + "/auth"; + String time = String.valueOf(System.currentTimeMillis()); + String digest = SecureUtil.sha256().digestHex(account.getAppKey() + time + account.getMasterSecret()); + QueryTokenParamDTO param = QueryTokenParamDTO.builder() + .timestamp(time) + .appKey(account.getAppKey()) + .sign(digest).build(); + + String body = HttpRequest.post(url).header(Header.CONTENT_TYPE.getValue(), ContentType.JSON.getValue()) + .body(JSON.toJSONString(param)) + .timeout(20000) + .execute().body(); + GeTuiTokenResultDTO geTuiTokenResultDTO = JSON.parseObject(body, GeTuiTokenResultDTO.class); + if (geTuiTokenResultDTO.getCode().equals(0)) { + accessToken = geTuiTokenResultDTO.getData().getToken(); + } + } catch (Exception e) { + log.error("RefreshGeTuiAccessTokenHandler#getAccessToken fail:{}", Throwables.getStackTraceAsString(e)); + } + return accessToken; + } + +} diff --git a/austin-handler/pom.xml b/austin-handler/pom.xml index 6fdac1a..7f542c6 100644 --- a/austin-handler/pom.xml +++ b/austin-handler/pom.xml @@ -46,6 +46,11 @@ com.github.binarywang weixin-java-mp + + + com.github.binarywang + weixin-java-miniapp + @@ -54,4 +59,4 @@ - \ No newline at end of file + diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/domain/DeduplicationParam.java b/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/DeduplicationParam.java similarity index 94% rename from austin-handler/src/main/java/com/java3y/austin/handler/domain/DeduplicationParam.java rename to austin-handler/src/main/java/com/java3y/austin/handler/deduplication/DeduplicationParam.java index e6c98c6..fe74595 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/domain/DeduplicationParam.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/DeduplicationParam.java @@ -1,4 +1,4 @@ -package com.java3y.austin.handler.domain; +package com.java3y.austin.handler.deduplication; import com.alibaba.fastjson.annotation.JSONField; import com.java3y.austin.common.domain.TaskInfo; diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/DeduplicationRuleService.java b/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/DeduplicationRuleService.java index 0577827..24cb518 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/DeduplicationRuleService.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/DeduplicationRuleService.java @@ -5,7 +5,6 @@ import com.ctrip.framework.apollo.spring.annotation.ApolloConfig; import com.java3y.austin.common.constant.AustinConstant; import com.java3y.austin.common.domain.TaskInfo; import com.java3y.austin.common.enums.DeduplicationType; -import com.java3y.austin.handler.domain.DeduplicationParam; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/build/AbstractDeduplicationBuilder.java b/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/build/AbstractDeduplicationBuilder.java index 722317f..f69c2a8 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/build/AbstractDeduplicationBuilder.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/build/AbstractDeduplicationBuilder.java @@ -3,7 +3,7 @@ package com.java3y.austin.handler.deduplication.build; import com.alibaba.fastjson.JSONObject; import com.java3y.austin.common.domain.TaskInfo; import com.java3y.austin.handler.deduplication.DeduplicationHolder; -import com.java3y.austin.handler.domain.DeduplicationParam; +import com.java3y.austin.handler.deduplication.DeduplicationParam; import org.springframework.beans.factory.annotation.Autowired; import javax.annotation.PostConstruct; diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/build/Builder.java b/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/build/Builder.java index abe55e8..8631037 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/build/Builder.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/build/Builder.java @@ -1,7 +1,7 @@ package com.java3y.austin.handler.deduplication.build; import com.java3y.austin.common.domain.TaskInfo; -import com.java3y.austin.handler.domain.DeduplicationParam; +import com.java3y.austin.handler.deduplication.DeduplicationParam; /** * @author luohaojie diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/build/ContentDeduplicationBuilder.java b/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/build/ContentDeduplicationBuilder.java index 11094a2..358c11e 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/build/ContentDeduplicationBuilder.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/build/ContentDeduplicationBuilder.java @@ -3,7 +3,7 @@ package com.java3y.austin.handler.deduplication.build; import com.java3y.austin.common.domain.TaskInfo; import com.java3y.austin.common.enums.AnchorState; import com.java3y.austin.common.enums.DeduplicationType; -import com.java3y.austin.handler.domain.DeduplicationParam; +import com.java3y.austin.handler.deduplication.DeduplicationParam; import org.springframework.stereotype.Service; diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/build/FrequencyDeduplicationBuilder.java b/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/build/FrequencyDeduplicationBuilder.java index ae4603a..3d83962 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/build/FrequencyDeduplicationBuilder.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/build/FrequencyDeduplicationBuilder.java @@ -4,7 +4,7 @@ import cn.hutool.core.date.DateUtil; import com.java3y.austin.common.domain.TaskInfo; import com.java3y.austin.common.enums.AnchorState; import com.java3y.austin.common.enums.DeduplicationType; -import com.java3y.austin.handler.domain.DeduplicationParam; +import com.java3y.austin.handler.deduplication.DeduplicationParam; import org.springframework.stereotype.Service; import java.util.Date; diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/limit/AbstractLimitService.java b/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/limit/AbstractLimitService.java new file mode 100644 index 0000000..432022e --- /dev/null +++ b/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/limit/AbstractLimitService.java @@ -0,0 +1,36 @@ +package com.java3y.austin.handler.deduplication.limit; + +import com.java3y.austin.common.domain.TaskInfo; +import com.java3y.austin.handler.deduplication.service.AbstractDeduplicationService; + +import java.util.*; + +/** + * @author cao + * @date 2022-04-20 12:00 + */ +public abstract class AbstractLimitService implements LimitService { + + + /** + * 获取得到当前消息模板所有的去重Key + * + * @param taskInfo + * @return + */ + protected List deduplicationAllKey(AbstractDeduplicationService service, TaskInfo taskInfo) { + List result = new ArrayList<>(taskInfo.getReceiver().size()); + for (String receiver : taskInfo.getReceiver()) { + String key = deduplicationSingleKey(service, taskInfo, receiver); + result.add(key); + } + return result; + } + + + protected String deduplicationSingleKey(AbstractDeduplicationService service, TaskInfo taskInfo, String receiver) { + + return service.deduplicationSingleKey(taskInfo, receiver); + + } +} diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/limit/LimitService.java b/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/limit/LimitService.java new file mode 100644 index 0000000..65d8b04 --- /dev/null +++ b/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/limit/LimitService.java @@ -0,0 +1,25 @@ +package com.java3y.austin.handler.deduplication.limit; + +import com.java3y.austin.common.domain.TaskInfo; +import com.java3y.austin.handler.deduplication.DeduplicationParam; +import com.java3y.austin.handler.deduplication.service.AbstractDeduplicationService; + +import java.util.Set; + +/** + * @author cao + * @date 2022-04-20 11:58 + */ +public interface LimitService { + + + /** + * 去重限制 + * @param service 去重器对象 + * @param taskInfo + * @param param 去重参数 + * @return 返回不符合条件的手机号码 + */ + Set limitFilter(AbstractDeduplicationService service, TaskInfo taskInfo, DeduplicationParam param); + +} diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/limit/SimpleLimitService.java b/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/limit/SimpleLimitService.java new file mode 100644 index 0000000..2976805 --- /dev/null +++ b/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/limit/SimpleLimitService.java @@ -0,0 +1,77 @@ +package com.java3y.austin.handler.deduplication.limit; + +import cn.hutool.core.collection.CollUtil; +import com.java3y.austin.common.constant.AustinConstant; +import com.java3y.austin.common.domain.TaskInfo; +import com.java3y.austin.handler.deduplication.DeduplicationParam; +import com.java3y.austin.handler.deduplication.service.AbstractDeduplicationService; +import com.java3y.austin.support.utils.RedisUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.*; +import java.util.stream.Collectors; + +/** + * 简单去重器(目前承载着 N分钟相同内容去重) + * @author cao + * @date 2022-04-20 13:41 + */ +@Service(value = "SimpleLimitService") +public class SimpleLimitService extends AbstractLimitService { + + private static final String LIMIT_TAG = "SP_"; + + @Autowired + private RedisUtils redisUtils; + + @Override + public Set limitFilter(AbstractDeduplicationService service, TaskInfo taskInfo, DeduplicationParam param) { + Set filterReceiver = new HashSet<>(taskInfo.getReceiver().size()); + // 获取redis记录 + Map readyPutRedisReceiver = new HashMap(taskInfo.getReceiver().size()); + //redis数据隔离 + List keys = deduplicationAllKey(service, taskInfo).stream().map(key -> LIMIT_TAG + key).collect(Collectors.toList()); + Map inRedisValue = redisUtils.mGet(keys); + + for (String receiver : taskInfo.getReceiver()) { + String key = LIMIT_TAG + deduplicationSingleKey(service, taskInfo, receiver); + String value = inRedisValue.get(key); + + // 符合条件的用户 + if (value != null && Integer.parseInt(value) >= param.getCountNum()) { + filterReceiver.add(receiver); + } else { + readyPutRedisReceiver.put(receiver, key); + } + } + + // 不符合条件的用户:需要更新Redis(无记录添加,有记录则累加次数) + putInRedis(readyPutRedisReceiver, inRedisValue, param.getDeduplicationTime()); + + return filterReceiver; + } + + + /** + * 存入redis 实现去重 + * + * @param readyPutRedisReceiver + */ + private void putInRedis(Map readyPutRedisReceiver, + Map inRedisValue, Long deduplicationTime) { + Map keyValues = new HashMap<>(readyPutRedisReceiver.size()); + for (Map.Entry entry : readyPutRedisReceiver.entrySet()) { + String key = entry.getValue(); + if (inRedisValue.get(key) != null) { + keyValues.put(key, String.valueOf(Integer.valueOf(inRedisValue.get(key)) + 1)); + } else { + keyValues.put(key, String.valueOf(AustinConstant.TRUE)); + } + } + if (CollUtil.isNotEmpty(keyValues)) { + redisUtils.pipelineSetEx(keyValues, deduplicationTime); + } + } + +} diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/limit/SlideWindowLimitService.java b/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/limit/SlideWindowLimitService.java new file mode 100644 index 0000000..392462d --- /dev/null +++ b/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/limit/SlideWindowLimitService.java @@ -0,0 +1,69 @@ +package com.java3y.austin.handler.deduplication.limit; + +import com.java3y.austin.common.domain.TaskInfo; +import com.java3y.austin.handler.deduplication.DeduplicationParam; +import com.java3y.austin.handler.deduplication.service.AbstractDeduplicationService; +import com.java3y.austin.support.utils.RedisUtils; +import com.java3y.austin.support.utils.SnowFlakeIdUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.core.io.ClassPathResource; +import org.springframework.data.redis.core.script.DefaultRedisScript; +import org.springframework.scripting.support.ResourceScriptSource; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +/** + * 滑动窗口去重器(目前承载着 一天内N次相同渠道去重) + * @author cao + * @date 2022-04-20 11:34 + */ +@Service(value = "SlideWindowLimitService") +public class SlideWindowLimitService extends AbstractLimitService { + + private static final String LIMIT_TAG = "SW_"; + + @Autowired + private RedisUtils redisUtils; + + private SnowFlakeIdUtils snowFlakeIdUtils = new SnowFlakeIdUtils(1, 1); + + private DefaultRedisScript redisScript; + + + @PostConstruct + public void init() { + redisScript = new DefaultRedisScript(); + redisScript.setResultType(Long.class); + redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("limit.lua"))); + } + + + /** + * @param service 去重器对象 + * @param taskInfo + * @param param 去重参数 + * @return 返回不符合条件的手机号码 + */ + @Override + public Set limitFilter(AbstractDeduplicationService service, TaskInfo taskInfo, DeduplicationParam param) { + + Set filterReceiver = new HashSet<>(taskInfo.getReceiver().size()); + long nowTime = System.currentTimeMillis(); + for (String receiver : taskInfo.getReceiver()) { + String key = LIMIT_TAG + deduplicationSingleKey(service, taskInfo, receiver); + String scoreValue = String.valueOf(snowFlakeIdUtils.nextId()); + String score = String.valueOf(nowTime); + if (redisUtils.execLimitLua(redisScript, Arrays.asList(key), String.valueOf(param.getDeduplicationTime() * 1000), score, String.valueOf(param.getCountNum()), scoreValue)) { + filterReceiver.add(receiver); + } + + } + return filterReceiver; + } + + +} diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/service/AbstractDeduplicationService.java b/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/service/AbstractDeduplicationService.java index 9c0bbc4..5937374 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/service/AbstractDeduplicationService.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/service/AbstractDeduplicationService.java @@ -1,13 +1,12 @@ package com.java3y.austin.handler.deduplication.service; import cn.hutool.core.collection.CollUtil; -import com.java3y.austin.common.constant.AustinConstant; import com.java3y.austin.common.domain.AnchorInfo; import com.java3y.austin.common.domain.TaskInfo; import com.java3y.austin.handler.deduplication.DeduplicationHolder; -import com.java3y.austin.handler.domain.DeduplicationParam; +import com.java3y.austin.handler.deduplication.DeduplicationParam; +import com.java3y.austin.handler.deduplication.limit.LimitService; import com.java3y.austin.support.utils.LogUtils; -import com.java3y.austin.support.utils.RedisUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -21,8 +20,11 @@ import java.util.*; */ @Slf4j public abstract class AbstractDeduplicationService implements DeduplicationService { + protected Integer deduplicationType; + protected LimitService limitService; + @Autowired private DeduplicationHolder deduplicationHolder; @@ -31,8 +33,6 @@ public abstract class AbstractDeduplicationService implements DeduplicationServi deduplicationHolder.putService(deduplicationType, this); } - @Autowired - private RedisUtils redisUtils; @Autowired private LogUtils logUtils; @@ -40,27 +40,8 @@ public abstract class AbstractDeduplicationService implements DeduplicationServi @Override public void deduplication(DeduplicationParam param) { TaskInfo taskInfo = param.getTaskInfo(); - Set filterReceiver = new HashSet<>(taskInfo.getReceiver().size()); - - // 获取redis记录 - Set readyPutRedisReceiver = new HashSet<>(taskInfo.getReceiver().size()); - List keys = deduplicationAllKey(taskInfo); - Map inRedisValue = redisUtils.mGet(keys); - - for (String receiver : taskInfo.getReceiver()) { - String key = deduplicationSingleKey(taskInfo, receiver); - String value = inRedisValue.get(key); - - // 符合条件的用户 - if (value != null && Integer.parseInt(value) >= param.getCountNum()) { - filterReceiver.add(receiver); - } else { - readyPutRedisReceiver.add(receiver); - } - } - // 不符合条件的用户:需要更新Redis(无记录添加,有记录则累加次数) - putInRedis(readyPutRedisReceiver, inRedisValue, param); + Set filterReceiver = limitService.limitFilter(this, taskInfo, param); // 剔除符合去重条件的用户 if (CollUtil.isNotEmpty(filterReceiver)) { @@ -77,44 +58,7 @@ public abstract class AbstractDeduplicationService implements DeduplicationServi * @param receiver * @return */ - protected abstract String deduplicationSingleKey(TaskInfo taskInfo, String receiver); - - - /** - * 存入redis 实现去重 - * - * @param readyPutRedisReceiver - */ - private void putInRedis(Set readyPutRedisReceiver, - Map inRedisValue, DeduplicationParam param) { - Map keyValues = new HashMap<>(readyPutRedisReceiver.size()); - for (String receiver : readyPutRedisReceiver) { - String key = deduplicationSingleKey(param.getTaskInfo(), receiver); - if (inRedisValue.get(key) != null) { - keyValues.put(key, String.valueOf(Integer.valueOf(inRedisValue.get(key)) + 1)); - } else { - keyValues.put(key, String.valueOf(AustinConstant.TRUE)); - } - } - if (CollUtil.isNotEmpty(keyValues)) { - redisUtils.pipelineSetEx(keyValues, param.getDeduplicationTime()); - } - } - - /** - * 获取得到当前消息模板所有的去重Key - * - * @param taskInfo - * @return - */ - private List deduplicationAllKey(TaskInfo taskInfo) { - List result = new ArrayList<>(taskInfo.getReceiver().size()); - for (String receiver : taskInfo.getReceiver()) { - String key = deduplicationSingleKey(taskInfo, receiver); - result.add(key); - } - return result; - } + public abstract String deduplicationSingleKey(TaskInfo taskInfo, String receiver); } diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/service/ContentDeduplicationService.java b/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/service/ContentDeduplicationService.java index ca14bb7..6daaddd 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/service/ContentDeduplicationService.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/service/ContentDeduplicationService.java @@ -4,6 +4,9 @@ import cn.hutool.crypto.digest.DigestUtil; import com.alibaba.fastjson.JSON; import com.java3y.austin.common.domain.TaskInfo; import com.java3y.austin.common.enums.DeduplicationType; +import com.java3y.austin.handler.deduplication.limit.LimitService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Service; /** @@ -14,7 +17,10 @@ import org.springframework.stereotype.Service; @Service public class ContentDeduplicationService extends AbstractDeduplicationService { - public ContentDeduplicationService() { + + @Autowired + public ContentDeduplicationService(@Qualifier("SlideWindowLimitService") LimitService limitService) { + this.limitService = limitService; deduplicationType = DeduplicationType.CONTENT.getCode(); } diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/service/DeduplicationService.java b/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/service/DeduplicationService.java index 6eac046..a34a0b6 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/service/DeduplicationService.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/service/DeduplicationService.java @@ -1,7 +1,7 @@ package com.java3y.austin.handler.deduplication.service; -import com.java3y.austin.handler.domain.DeduplicationParam; +import com.java3y.austin.handler.deduplication.DeduplicationParam; /** * @author huskey diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/service/FrequencyDeduplicationService.java b/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/service/FrequencyDeduplicationService.java index 102a93b..9091955 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/service/FrequencyDeduplicationService.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/service/FrequencyDeduplicationService.java @@ -3,8 +3,12 @@ package com.java3y.austin.handler.deduplication.service; import cn.hutool.core.util.StrUtil; import com.java3y.austin.common.domain.TaskInfo; import com.java3y.austin.common.enums.DeduplicationType; +import com.java3y.austin.handler.deduplication.limit.LimitService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Service; + /** * @author 3y * @date 2021/12/12 @@ -13,8 +17,13 @@ import org.springframework.stereotype.Service; @Service public class FrequencyDeduplicationService extends AbstractDeduplicationService { - public FrequencyDeduplicationService() { + + @Autowired + public FrequencyDeduplicationService(@Qualifier("SimpleLimitService") LimitService limitService) { + + this.limitService = limitService; deduplicationType = DeduplicationType.FREQUENCY.getCode(); + } private static final String PREFIX = "FRE"; @@ -33,7 +42,7 @@ public class FrequencyDeduplicationService extends AbstractDeduplicationService @Override public String deduplicationSingleKey(TaskInfo taskInfo, String receiver) { return PREFIX + StrUtil.C_UNDERLINE - + receiver + StrUtil.C_UNDERLINE + + receiver + StrUtil.C_UNDERLINE + taskInfo.getMessageTemplateId() + StrUtil.C_UNDERLINE + taskInfo.getSendChannel(); } diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/domain/push/PushParam.java b/austin-handler/src/main/java/com/java3y/austin/handler/domain/push/PushParam.java new file mode 100644 index 0000000..5ab85ca --- /dev/null +++ b/austin-handler/src/main/java/com/java3y/austin/handler/domain/push/PushParam.java @@ -0,0 +1,33 @@ +package com.java3y.austin.handler.domain.push; + + +import com.java3y.austin.common.domain.TaskInfo; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.Set; + +@NoArgsConstructor +@AllArgsConstructor +@Data +@Builder +public class PushParam { + + /** + * 调用 接口时需要的token + */ + private String token; + + /** + * 调用接口时需要的appId + */ + private String appId; + + /** + * 消息模板的信息 + */ + private TaskInfo taskInfo; + +} diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/domain/push/getui/BatchSendPushParam.java b/austin-handler/src/main/java/com/java3y/austin/handler/domain/push/getui/BatchSendPushParam.java new file mode 100644 index 0000000..3afbb0a --- /dev/null +++ b/austin-handler/src/main/java/com/java3y/austin/handler/domain/push/getui/BatchSendPushParam.java @@ -0,0 +1,54 @@ +package com.java3y.austin.handler.domain.push.getui; + +import com.alibaba.fastjson.annotation.JSONField; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.Set; + + +/** + * 批量推送消息的param + * + * @author 3y + * https://docs.getui.com/getui/server/rest_v2/push/ + */ +@NoArgsConstructor +@AllArgsConstructor +@Data +@Builder +public class BatchSendPushParam { + + /** + * audience + */ + @JSONField(name = "audience") + private AudienceVO audience; + /** + * taskid + */ + @JSONField(name = "taskid") + private String taskId; + /** + * isAsync + */ + @JSONField(name = "is_async") + private Boolean isAsync; + + /** + * AudienceVO + */ + @NoArgsConstructor + @Data + @Builder + @AllArgsConstructor + public static class AudienceVO { + /** + * cid + */ + @JSONField(name = "cid") + private Set cid; + } +} diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/domain/push/getui/SendPushParam.java b/austin-handler/src/main/java/com/java3y/austin/handler/domain/push/getui/SendPushParam.java new file mode 100644 index 0000000..664d748 --- /dev/null +++ b/austin-handler/src/main/java/com/java3y/austin/handler/domain/push/getui/SendPushParam.java @@ -0,0 +1,115 @@ +package com.java3y.austin.handler.domain.push.getui; + +import com.alibaba.fastjson.annotation.JSONField; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.Set; + +/** + * 推送消息的param + * @author 3y + * https://docs.getui.com/getui/server/rest_v2/push/ + */ +@NoArgsConstructor +@AllArgsConstructor +@Data +@Builder +public class SendPushParam { + + /** + * requestId + */ + @JSONField(name = "request_id") + private String requestId; + /** + * settings + */ + @JSONField(name = "settings") + private SettingsVO settings; + /** + * audience + */ + @JSONField(name = "audience") + private AudienceVO audience; + /** + * pushMessage + */ + @JSONField(name = "push_message") + private PushMessageVO pushMessage; + + /** + * SettingsVO + */ + @NoArgsConstructor + @Data + public static class SettingsVO { + /** + * ttl + */ + @JSONField(name = "ttl") + private Integer ttl; + } + + /** + * AudienceVO + */ + @NoArgsConstructor + @Data + @AllArgsConstructor + @Builder + public static class AudienceVO { + /** + * cid + */ + @JSONField(name = "cid") + private Set cid; + } + + /** + * PushMessageVO + */ + @NoArgsConstructor + @Data + @AllArgsConstructor + @Builder + public static class PushMessageVO { + /** + * notification + */ + @JSONField(name = "notification") + private NotificationVO notification; + + /** + * NotificationVO + */ + @NoArgsConstructor + @Data + @AllArgsConstructor + @Builder + public static class NotificationVO { + /** + * title + */ + @JSONField(name = "title") + private String title; + /** + * body + */ + @JSONField(name = "body") + private String body; + /** + * clickType + */ + @JSONField(name = "click_type") + private String clickType; + /** + * url + */ + @JSONField(name = "url") + private String url; + } + } +} diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/domain/push/getui/SendPushResult.java b/austin-handler/src/main/java/com/java3y/austin/handler/domain/push/getui/SendPushResult.java new file mode 100644 index 0000000..5175d27 --- /dev/null +++ b/austin-handler/src/main/java/com/java3y/austin/handler/domain/push/getui/SendPushResult.java @@ -0,0 +1,38 @@ +package com.java3y.austin.handler.domain.push.getui; + + +import com.alibaba.fastjson.JSONObject; +import com.alibaba.fastjson.annotation.JSONField; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + + +/** + * 发送消息后的返回值 + * @author 3y + * https://docs.getui.com/getui/server/rest_v2/common_args/?id=doc-title-1 + */ +@Builder +@Data +@AllArgsConstructor +@NoArgsConstructor +public class SendPushResult { + /** + * msg + */ + @JSONField(name = "msg") + private String msg; + /** + * code + */ + @JSONField(name = "code") + private Integer code; + /** + * data + */ + @JSONField(name = "data") + private JSONObject data; + +} diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/domain/wechat/WeChatMiniProgramParam.java b/austin-handler/src/main/java/com/java3y/austin/handler/domain/wechat/WeChatMiniProgramParam.java new file mode 100644 index 0000000..54bab4f --- /dev/null +++ b/austin-handler/src/main/java/com/java3y/austin/handler/domain/wechat/WeChatMiniProgramParam.java @@ -0,0 +1,38 @@ +package com.java3y.austin.handler.domain.wechat; + +import lombok.Builder; +import lombok.Data; + +import java.util.Map; +import java.util.Set; + +/** + * @author sunql + * @date 2022年05月06日 15:56 + * + * 小程序参数 + */ +@Data +@Builder +public class WeChatMiniProgramParam { + /** + * 业务Id + */ + private Long messageTemplateId; + + /** + * 发送账号 + */ + private Integer sendAccount; + + /** + * 接收者(用户)的 openid + */ + private Set openIds; + + /** + * 模板内容,格式形如 { "key1": { "value": any }, "key2": { "value": any } } + */ + private Map data; + +} diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/domain/wechat/WeChatOfficialParam.java b/austin-handler/src/main/java/com/java3y/austin/handler/domain/wechat/WeChatOfficialParam.java new file mode 100644 index 0000000..ecd134c --- /dev/null +++ b/austin-handler/src/main/java/com/java3y/austin/handler/domain/wechat/WeChatOfficialParam.java @@ -0,0 +1,37 @@ +package com.java3y.austin.handler.domain.wechat; + +import lombok.Builder; +import lombok.Data; + +import java.util.Map; +import java.util.Set; + +/** + * @author sunql + * @date 2022年05月06日 9:56 + * + * 服务号参数 + */ +@Data +@Builder +public class WeChatOfficialParam { + /** + * 业务Id + */ + private Long messageTemplateId; + + /** + * 关注服务号得用户 + */ + private Set openIds; + + /** + * 模板消息的信息载体 + */ + private Map data; + + /** + * 发送账号 + */ + private Integer sendAccount; +} diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/enums/RateLimitStrategy.java b/austin-handler/src/main/java/com/java3y/austin/handler/enums/RateLimitStrategy.java new file mode 100644 index 0000000..80ac3f1 --- /dev/null +++ b/austin-handler/src/main/java/com/java3y/austin/handler/enums/RateLimitStrategy.java @@ -0,0 +1,26 @@ +package com.java3y.austin.handler.enums; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.ToString; + +/** + * 限流枚举 + * + * @author 3y + */ +@Getter +@ToString +@AllArgsConstructor +public enum RateLimitStrategy { + + + REQUEST_RATE_LIMIT(10, "根据真实请求数限流"), + SEND_USER_NUM_RATE_LIMIT(20, "根据发送用户数限流"), + ; + + private Integer code; + private String description; + + +} diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/flowcontrol/FlowControlParam.java b/austin-handler/src/main/java/com/java3y/austin/handler/flowcontrol/FlowControlParam.java new file mode 100644 index 0000000..9dfa9af --- /dev/null +++ b/austin-handler/src/main/java/com/java3y/austin/handler/flowcontrol/FlowControlParam.java @@ -0,0 +1,39 @@ +package com.java3y.austin.handler.flowcontrol; + +import com.google.common.util.concurrent.RateLimiter; +import com.java3y.austin.handler.enums.RateLimitStrategy; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * @author 3y + * @date 2022/4/18 + *

+ * 流量控制所需要的参数 + */ +@Builder +@Data +@AllArgsConstructor +@NoArgsConstructor +public class FlowControlParam { + + /** + * 限流器 + * 子类初始化的时候指定 + */ + protected RateLimiter rateLimiter; + + /** + * 限流器初始限流大小 + * 子类初始化的时候指定 + */ + protected Double rateInitValue; + + /** + * 限流的策略 + * 子类初始化的时候指定 + */ + protected RateLimitStrategy rateLimitStrategy; +} diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/flowcontrol/FlowControlService.java b/austin-handler/src/main/java/com/java3y/austin/handler/flowcontrol/FlowControlService.java new file mode 100644 index 0000000..8986c6a --- /dev/null +++ b/austin-handler/src/main/java/com/java3y/austin/handler/flowcontrol/FlowControlService.java @@ -0,0 +1,20 @@ +package com.java3y.austin.handler.flowcontrol; + +import com.java3y.austin.common.domain.TaskInfo; + +/** + * @author 3y + * 流量控制服务 + */ +public interface FlowControlService { + + + /** + * 根据渠道进行流量控制 + * + * @param taskInfo + * @param flowControlParam + */ + void flowControl(TaskInfo taskInfo, FlowControlParam flowControlParam); + +} diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/flowcontrol/impl/FlowControlServiceImpl.java b/austin-handler/src/main/java/com/java3y/austin/handler/flowcontrol/impl/FlowControlServiceImpl.java new file mode 100644 index 0000000..5713a2c --- /dev/null +++ b/austin-handler/src/main/java/com/java3y/austin/handler/flowcontrol/impl/FlowControlServiceImpl.java @@ -0,0 +1,77 @@ +package com.java3y.austin.handler.flowcontrol.impl; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.ctrip.framework.apollo.Config; +import com.ctrip.framework.apollo.spring.annotation.ApolloConfig; +import com.google.common.util.concurrent.RateLimiter; +import com.java3y.austin.common.constant.AustinConstant; +import com.java3y.austin.common.domain.TaskInfo; +import com.java3y.austin.common.enums.ChannelType; +import com.java3y.austin.handler.enums.RateLimitStrategy; +import com.java3y.austin.handler.flowcontrol.FlowControlParam; +import com.java3y.austin.handler.flowcontrol.FlowControlService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +/** + * @author 3y + * @date 2022/4/18 + */ +@Service +@Slf4j +public class FlowControlServiceImpl implements FlowControlService { + + private static final String FLOW_CONTROL_KEY = "flowControl"; + + private static final String FLOW_CONTROL_PREFIX = "flow_control_"; + + @ApolloConfig("boss.austin") + private Config config; + + + @Override + public void flowControl(TaskInfo taskInfo, FlowControlParam flowControlParam) { + RateLimiter rateLimiter = flowControlParam.getRateLimiter(); + Double rateInitValue = flowControlParam.getRateInitValue(); + + double costTime = 0; + + // 对比 初始限流值 与 配置限流值,以 配置中心的限流值为准 + Double rateLimitConfig = getRateLimitConfig(taskInfo.getSendChannel()); + if (rateLimitConfig != null && !rateInitValue.equals(rateLimitConfig)) { + rateLimiter = RateLimiter.create(rateLimitConfig); + flowControlParam.setRateInitValue(rateLimitConfig); + flowControlParam.setRateLimiter(rateLimiter); + } + if (RateLimitStrategy.REQUEST_RATE_LIMIT.equals(flowControlParam.getRateLimitStrategy())) { + costTime = rateLimiter.acquire(1); + } + if (RateLimitStrategy.SEND_USER_NUM_RATE_LIMIT.equals(flowControlParam.getRateLimitStrategy())) { + costTime = rateLimiter.acquire(taskInfo.getReceiver().size()); + } + + if (costTime > 0) { + log.info("consumer {} flow control time {}", + ChannelType.getEnumByCode(taskInfo.getSendChannel()).getDescription(), costTime); + } + } + + /** + * 得到限流值的配置 + *

+ * apollo配置样例 key:flowControl value:{"flow_control_40":1} + *

+ * 渠道枚举可看:com.java3y.austin.common.enums.ChannelType + * + * @param channelCode + */ + private Double getRateLimitConfig(Integer channelCode) { + String flowControlConfig = config.getProperty(FLOW_CONTROL_KEY, AustinConstant.APOLLO_DEFAULT_VALUE_JSON_OBJECT); + JSONObject jsonObject = JSON.parseObject(flowControlConfig); + if (jsonObject.getDouble(FLOW_CONTROL_PREFIX + channelCode) == null) { + return null; + } + return jsonObject.getDouble(FLOW_CONTROL_PREFIX + channelCode); + } +} diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/handler/BaseHandler.java b/austin-handler/src/main/java/com/java3y/austin/handler/handler/BaseHandler.java index ec16778..236d1f8 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/handler/BaseHandler.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/handler/BaseHandler.java @@ -1,8 +1,12 @@ package com.java3y.austin.handler.handler; +import com.google.common.util.concurrent.RateLimiter; import com.java3y.austin.common.domain.AnchorInfo; import com.java3y.austin.common.domain.TaskInfo; import com.java3y.austin.common.enums.AnchorState; +import com.java3y.austin.handler.enums.RateLimitStrategy; +import com.java3y.austin.handler.flowcontrol.FlowControlParam; +import com.java3y.austin.handler.flowcontrol.FlowControlService; import com.java3y.austin.support.utils.LogUtils; import org.springframework.beans.factory.annotation.Autowired; @@ -13,6 +17,12 @@ import javax.annotation.PostConstruct; * 发送各个渠道的handler */ public abstract class BaseHandler implements Handler { + @Autowired + private HandlerHolder handlerHolder; + @Autowired + private LogUtils logUtils; + @Autowired + private FlowControlService flowControlService; /** * 标识渠道的Code @@ -20,12 +30,11 @@ public abstract class BaseHandler implements Handler { */ protected Integer channelCode; - - @Autowired - private HandlerHolder handlerHolder; - @Autowired - private LogUtils logUtils; - + /** + * 限流相关的参数 + * 子类初始化的时候指定 + */ + protected FlowControlParam flowControlParam; /** * 初始化渠道与Handler的映射关系 @@ -35,8 +44,20 @@ public abstract class BaseHandler implements Handler { handlerHolder.putHandler(channelCode, this); } + /** + * 流量控制 + * + * @param taskInfo + */ + public void flowControl(TaskInfo taskInfo) { + // 只有子类指定了限流参数,才需要限流 + if (flowControlParam != null) { + flowControlService.flowControl(taskInfo, flowControlParam); + } + } @Override public void doHandler(TaskInfo taskInfo) { + flowControl(taskInfo); if (handler(taskInfo)) { logUtils.print(AnchorInfo.builder().state(AnchorState.SEND_SUCCESS.getCode()).businessId(taskInfo.getBusinessId()).ids(taskInfo.getReceiver()).build()); return; @@ -44,6 +65,9 @@ public abstract class BaseHandler implements Handler { logUtils.print(AnchorInfo.builder().state(AnchorState.SEND_FAIL.getCode()).businessId(taskInfo.getBusinessId()).ids(taskInfo.getReceiver()).build()); } + + + /** * 统一处理的handler接口 * @@ -52,4 +76,6 @@ public abstract class BaseHandler implements Handler { */ public abstract boolean handler(TaskInfo taskInfo); + + } diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/DingDingRobotHandler.java b/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/DingDingRobotHandler.java index 9a1ed77..bc4d702 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/DingDingRobotHandler.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/DingDingRobotHandler.java @@ -45,7 +45,7 @@ public class DingDingRobotHandler extends BaseHandler implements Handler { @Override public boolean handler(TaskInfo taskInfo) { try { - DingDingRobotAccount account = accountUtils.getAccount(taskInfo.getSendAccount(), SendAccountConstant.DING_DING_ROBOT_ACCOUNT_KEY, SendAccountConstant.DING_DING_ROBOT_PREFIX, new DingDingRobotAccount()); + DingDingRobotAccount account = accountUtils.getAccount(taskInfo.getSendAccount(), SendAccountConstant.DING_DING_ROBOT_ACCOUNT_KEY, SendAccountConstant.DING_DING_ROBOT_PREFIX, DingDingRobotAccount.class); DingDingRobotParam dingDingRobotParam = assembleParam(taskInfo); String httpResult = HttpUtil.post(assembleParamUrl(account), JSON.toJSONString(dingDingRobotParam)); DingDingRobotResult dingDingRobotResult = JSON.parseObject(httpResult, DingDingRobotResult.class); diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/DingDingWorkNoticeHandler.java b/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/DingDingWorkNoticeHandler.java index f6211bc..3ddee90 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/DingDingWorkNoticeHandler.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/DingDingWorkNoticeHandler.java @@ -50,7 +50,7 @@ public class DingDingWorkNoticeHandler extends BaseHandler implements Handler { @Override public boolean handler(TaskInfo taskInfo) { try { - DingDingWorkNoticeAccount account = accountUtils.getAccount(taskInfo.getSendAccount(), SendAccountConstant.DING_DING_WORK_NOTICE_ACCOUNT_KEY, SendAccountConstant.DING_DING_WORK_NOTICE_PREFIX, new DingDingWorkNoticeAccount()); + DingDingWorkNoticeAccount account = accountUtils.getAccount(taskInfo.getSendAccount(), SendAccountConstant.DING_DING_WORK_NOTICE_ACCOUNT_KEY, SendAccountConstant.DING_DING_WORK_NOTICE_PREFIX, DingDingWorkNoticeAccount.class); OapiMessageCorpconversationAsyncsendV2Request request = assembleParam(account, taskInfo); String accessToken = redisTemplate.opsForValue().get(SendAccountConstant.DING_DING_ACCESS_TOKEN_PREFIX + taskInfo.getSendAccount()); OapiMessageCorpconversationAsyncsendV2Response response = new DefaultDingTalkClient(URL).execute(request, accessToken); diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/EmailHandler.java b/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/EmailHandler.java index 64e71ee..f6e8a25 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/EmailHandler.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/EmailHandler.java @@ -4,10 +4,13 @@ package com.java3y.austin.handler.handler.impl; import cn.hutool.extra.mail.MailAccount; import cn.hutool.extra.mail.MailUtil; import com.google.common.base.Throwables; +import com.google.common.util.concurrent.RateLimiter; import com.java3y.austin.common.constant.SendAccountConstant; import com.java3y.austin.common.domain.TaskInfo; import com.java3y.austin.common.dto.model.EmailContentModel; import com.java3y.austin.common.enums.ChannelType; +import com.java3y.austin.handler.enums.RateLimitStrategy; +import com.java3y.austin.handler.flowcontrol.FlowControlParam; import com.java3y.austin.handler.handler.BaseHandler; import com.java3y.austin.handler.handler.Handler; import com.java3y.austin.support.utils.AccountUtils; @@ -30,6 +33,13 @@ public class EmailHandler extends BaseHandler implements Handler { public EmailHandler() { channelCode = ChannelType.EMAIL.getCode(); + + // 按照请求限流,默认单机 3 qps (具体数值配置在apollo动态调整) + Double rateInitValue = Double.valueOf(3); + flowControlParam = FlowControlParam.builder().rateInitValue(rateInitValue) + .rateLimitStrategy(RateLimitStrategy.REQUEST_RATE_LIMIT) + .rateLimiter(RateLimiter.create(rateInitValue)).build(); + } @Override @@ -52,7 +62,7 @@ public class EmailHandler extends BaseHandler implements Handler { * @return */ private MailAccount getAccountConfig(Integer sendAccount) { - MailAccount account = accountUtils.getAccount(sendAccount, SendAccountConstant.EMAIL_ACCOUNT_KEY, SendAccountConstant.EMAIL_ACCOUNT_PREFIX, new MailAccount()); + MailAccount account = accountUtils.getAccount(sendAccount, SendAccountConstant.EMAIL_ACCOUNT_KEY, SendAccountConstant.EMAIL_ACCOUNT_PREFIX, MailAccount.class); try { MailSSLSocketFactory sf = new MailSSLSocketFactory(); sf.setTrustAllHosts(true); diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/EnterpriseWeChatHandler.java b/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/EnterpriseWeChatHandler.java index bee72a6..c2e9bbf 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/EnterpriseWeChatHandler.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/EnterpriseWeChatHandler.java @@ -46,7 +46,7 @@ public class EnterpriseWeChatHandler extends BaseHandler implements Handler { @Override public boolean handler(TaskInfo taskInfo) { try { - WxCpDefaultConfigImpl accountConfig = accountUtils.getAccount(taskInfo.getSendAccount(), SendAccountConstant.ENTERPRISE_WECHAT_ACCOUNT_KEY, SendAccountConstant.ENTERPRISE_WECHAT_PREFIX, new WxCpDefaultConfigImpl()); + WxCpDefaultConfigImpl accountConfig = accountUtils.getAccount(taskInfo.getSendAccount(), SendAccountConstant.ENTERPRISE_WECHAT_ACCOUNT_KEY, SendAccountConstant.ENTERPRISE_WECHAT_PREFIX, WxCpDefaultConfigImpl.class); WxCpMessageServiceImpl messageService = new WxCpMessageServiceImpl(initService(accountConfig)); WxCpMessageSendResult result = messageService.send(buildWxCpMessage(taskInfo, accountConfig.getAgentId())); if (Integer.valueOf(WxMpErrorMsgEnum.CODE_0.getCode()).equals(result.getErrCode())) { diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/MiniProgramAccountHandler.java b/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/MiniProgramAccountHandler.java new file mode 100644 index 0000000..013fd15 --- /dev/null +++ b/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/MiniProgramAccountHandler.java @@ -0,0 +1,67 @@ +package com.java3y.austin.handler.handler.impl; + +import com.alibaba.fastjson.JSON; +import com.google.common.base.Throwables; +import com.java3y.austin.common.domain.TaskInfo; +import com.java3y.austin.common.dto.model.MiniProgramContentModel; +import com.java3y.austin.common.dto.model.OfficialAccountsContentModel; +import com.java3y.austin.common.enums.ChannelType; +import com.java3y.austin.handler.domain.wechat.WeChatMiniProgramParam; +import com.java3y.austin.handler.domain.wechat.WeChatOfficialParam; +import com.java3y.austin.handler.handler.BaseHandler; +import com.java3y.austin.handler.handler.Handler; +import com.java3y.austin.handler.script.MiniProgramAccountService; +import com.java3y.austin.handler.script.OfficialAccountService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +/** + * @author sunql + * 微信小程序发送订阅消息 + */ +@Component +@Slf4j +public class MiniProgramAccountHandler extends BaseHandler implements Handler { + + @Autowired + private MiniProgramAccountService miniProgramAccountService; + + public MiniProgramAccountHandler() { + channelCode = ChannelType.MINI_PROGRAM.getCode(); + } + + @Override + public boolean handler(TaskInfo taskInfo) { + WeChatMiniProgramParam miniProgramParam = buildMiniProgramParam(taskInfo); + try { + miniProgramAccountService.send(miniProgramParam); + } catch (Exception e) { + log.error("MiniProgramAccountHandler#handler fail:{},params:{}", + Throwables.getStackTraceAsString(e), JSON.toJSONString(taskInfo)); + return false; + } + return true; + } + + /** + * 通过taskInfo构建小程序订阅消息 + * + * @param taskInfo + * @return + */ + private WeChatMiniProgramParam buildMiniProgramParam(TaskInfo taskInfo) { + // 小程序订阅消息可以关联到系统业务,通过接口查询。 + WeChatMiniProgramParam miniProgramParam = WeChatMiniProgramParam.builder() + .openIds(taskInfo.getReceiver()) + .messageTemplateId(taskInfo.getMessageTemplateId()) + .sendAccount(taskInfo.getSendAccount()) + .build(); + + MiniProgramContentModel contentModel = (MiniProgramContentModel) taskInfo.getContentModel(); + miniProgramParam.setData(contentModel.getMap()); + return miniProgramParam; + } + +} + diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/OfficialAccountHandler.java b/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/OfficialAccountHandler.java index a1f274b..7e6707b 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/OfficialAccountHandler.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/OfficialAccountHandler.java @@ -5,19 +5,15 @@ import com.google.common.base.Throwables; import com.java3y.austin.common.domain.TaskInfo; import com.java3y.austin.common.dto.model.OfficialAccountsContentModel; import com.java3y.austin.common.enums.ChannelType; +import com.java3y.austin.handler.domain.wechat.WeChatOfficialParam; import com.java3y.austin.handler.handler.BaseHandler; import com.java3y.austin.handler.handler.Handler; import com.java3y.austin.handler.script.OfficialAccountService; import lombok.extern.slf4j.Slf4j; -import me.chanjar.weixin.mp.bean.template.WxMpTemplateData; -import me.chanjar.weixin.mp.bean.template.WxMpTemplateMessage; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import java.util.ArrayList; import java.util.List; -import java.util.Map; -import java.util.Set; /** * @author zyg @@ -30,18 +26,24 @@ public class OfficialAccountHandler extends BaseHandler implements Handler { @Autowired private OfficialAccountService officialAccountService; - public OfficialAccountHandler() { channelCode = ChannelType.OFFICIAL_ACCOUNT.getCode(); } @Override public boolean handler(TaskInfo taskInfo) { + // 构建微信模板消息 + OfficialAccountsContentModel contentModel = (OfficialAccountsContentModel) taskInfo.getContentModel(); + WeChatOfficialParam officialParam = WeChatOfficialParam.builder() + .openIds(taskInfo.getReceiver()) + .messageTemplateId(taskInfo.getMessageTemplateId()) + .sendAccount(taskInfo.getSendAccount()) + .data(contentModel.getMap()) + .build(); - List mpTemplateMessages = buildTemplateMsg(taskInfo); // 微信模板消息需要记录响应结果 try { - List messageIds = officialAccountService.send(mpTemplateMessages); + List messageIds = officialAccountService.send(officialParam); log.info("OfficialAccountHandler#handler successfully messageIds:{}", messageIds); return true; } catch (Exception e) { @@ -51,45 +53,5 @@ public class OfficialAccountHandler extends BaseHandler implements Handler { return false; } - /** - * 通过taskInfo构建微信模板消息 - * - * @param taskInfo - * @return - */ - private List buildTemplateMsg(TaskInfo taskInfo) { - // 需是关注公众号的用户的OpenId - Set receiver = taskInfo.getReceiver(); - Long messageTemplateId = taskInfo.getMessageTemplateId(); - // 微信模板消息可以关联到系统业务,通过接口查询。 - String templateId = getRealWxMpTemplateId(messageTemplateId); - List wxMpTemplateMessages = new ArrayList<>(receiver.size()); - OfficialAccountsContentModel contentModel = (OfficialAccountsContentModel) taskInfo.getContentModel(); - String url = contentModel.getUrl(); - Map param = contentModel.getMap(); - - // 构建微信模板消息 - for (String openId : receiver) { - WxMpTemplateMessage templateMessage = WxMpTemplateMessage.builder() - .toUser(openId) - .templateId(templateId) - .url(url) - .build(); - // WxMpTemplateData 对应模板消息 键 -- 值 -- color - param.forEach((k, v) -> templateMessage.addData(new WxMpTemplateData(k, v))); - wxMpTemplateMessages.add(templateMessage); - } - return wxMpTemplateMessages; - } - - /** - * 根据模板id获取真实的模板id - * - * @param messageTemplateId 系统业务模板id - * @return - */ - private String getRealWxMpTemplateId(Long messageTemplateId) { - return String.valueOf(messageTemplateId); - } } diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/PushHandler.java b/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/PushHandler.java new file mode 100644 index 0000000..93feeef --- /dev/null +++ b/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/PushHandler.java @@ -0,0 +1,164 @@ +package com.java3y.austin.handler.handler.impl; + +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.util.IdUtil; +import cn.hutool.http.ContentType; +import cn.hutool.http.Header; +import cn.hutool.http.HttpRequest; +import com.alibaba.fastjson.JSON; +import com.google.common.base.Throwables; +import com.java3y.austin.common.constant.SendAccountConstant; +import com.java3y.austin.common.domain.TaskInfo; +import com.java3y.austin.common.dto.account.GeTuiAccount; +import com.java3y.austin.common.dto.model.PushContentModel; +import com.java3y.austin.common.enums.ChannelType; + +import com.java3y.austin.handler.domain.push.PushParam; +import com.java3y.austin.handler.domain.push.getui.BatchSendPushParam; +import com.java3y.austin.handler.domain.push.getui.SendPushParam; +import com.java3y.austin.handler.domain.push.getui.SendPushResult; +import com.java3y.austin.handler.handler.BaseHandler; +import com.java3y.austin.handler.handler.Handler; +import com.java3y.austin.support.utils.AccountUtils; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.stereotype.Component; + +import java.util.Set; + +/** + * 通知栏消息发送处理 + *

+ * (目前具体的实现是个推服务商,安卓端已验证) + * + * @author 3y + */ +@Component +@Slf4j +public class PushHandler extends BaseHandler implements Handler { + + private static final String BASE_URL = "https://restapi.getui.com/v2/"; + private static final String SINGLE_PUSH_PATH = "/push/single/cid"; + private static final String BATCH_PUSH_CREATE_TASK_PATH = "/push/list/message"; + private static final String BATCH_PUSH_PATH = "/push/list/cid"; + + public PushHandler() { + channelCode = ChannelType.PUSH.getCode(); + } + + @Autowired + private AccountUtils accountUtils; + @Autowired + private StringRedisTemplate redisTemplate; + + + @Override + public boolean handler(TaskInfo taskInfo) { + + try { + GeTuiAccount account = accountUtils.getAccount(taskInfo.getSendAccount(), SendAccountConstant.GE_TUI_ACCOUNT_KEY, SendAccountConstant.GE_TUI_ACCOUNT_PREFIX, GeTuiAccount.class); + String token = redisTemplate.opsForValue().get(SendAccountConstant.GE_TUI_ACCESS_TOKEN_PREFIX + taskInfo.getSendAccount()); + PushParam pushParam = PushParam.builder().token(token).appId(account.getAppId()).taskInfo(taskInfo).build(); + + String result; + if (taskInfo.getReceiver().size() == 1) { + result = singlePush(pushParam); + } else { + result = batchPush(createTaskId(pushParam), pushParam); + } + SendPushResult sendPushResult = JSON.parseObject(result, SendPushResult.class); + if (sendPushResult.getCode().equals(0)) { + return true; + } + // 常见的错误 应当 关联至 AnchorState,由austin后台统一透出失败原因 + log.error("PushHandler#handler fail!result:{},params:{}", JSON.toJSONString(sendPushResult), JSON.toJSONString(taskInfo)); + } catch (Exception e) { + log.error("PushHandler#handler fail!e:{},params:{}", Throwables.getStackTraceAsString(e), JSON.toJSONString(taskInfo)); + } + return false; + } + + + /** + * 单推 + * @param pushParam + * @return http result + */ + private String singlePush(PushParam pushParam) { + String url = BASE_URL + pushParam.getAppId() + SINGLE_PUSH_PATH; + SendPushParam sendPushParam = assembleParam((PushContentModel) pushParam.getTaskInfo().getContentModel(), pushParam.getTaskInfo().getReceiver()); + String body = HttpRequest.post(url).header(Header.CONTENT_TYPE.getValue(), ContentType.JSON.getValue()) + .header("token", pushParam.getToken()) + .body(JSON.toJSONString(sendPushParam)) + .timeout(2000) + .execute().body(); + return body; + } + + + /** + * 批量推送 + * + * @param taskId 个推 返回的任务Id + * @param pushParam + * @return + */ + private String batchPush(String taskId, PushParam pushParam) { + String url = BASE_URL + pushParam.getAppId() + BATCH_PUSH_PATH; + BatchSendPushParam batchSendPushParam = BatchSendPushParam.builder() + .taskId(taskId) + .isAsync(true) + .audience(BatchSendPushParam.AudienceVO.builder().cid(pushParam.getTaskInfo().getReceiver()).build()).build(); + String body = HttpRequest.post(url).header(Header.CONTENT_TYPE.getValue(), ContentType.JSON.getValue()) + .header("token", pushParam.getToken()) + .body(JSON.toJSONString(batchSendPushParam)) + .timeout(2000) + .execute().body(); + return body; + } + + + /** + * 群推前需要构建taskId + * @param pushParam + * @return http result + */ + private String createTaskId(PushParam pushParam) { + String url = BASE_URL + pushParam.getAppId() + BATCH_PUSH_CREATE_TASK_PATH; + SendPushParam param = assembleParam((PushContentModel) pushParam.getTaskInfo().getContentModel()); + String taskId = ""; + try { + String body = HttpRequest.post(url).header(Header.CONTENT_TYPE.getValue(), ContentType.JSON.getValue()) + .header("token", pushParam.getToken()) + .body(JSON.toJSONString(param)) + .timeout(2000) + .execute().body(); + + taskId = JSON.parseObject(body, SendPushResult.class).getData().getString("taskId"); + } catch (Exception e) { + log.error("PushHandler#createTaskId fail :{},params:{}", Throwables.getStackTraceAsString(e), JSON.toJSONString(pushParam.getTaskInfo())); + } + + return taskId; + } + + + private SendPushParam assembleParam(PushContentModel pushContentModel) { + return assembleParam(pushContentModel, null); + } + + private SendPushParam assembleParam(PushContentModel pushContentModel, Set cid) { + SendPushParam param = SendPushParam.builder() + .requestId(String.valueOf(IdUtil.getSnowflake().nextId())) + .pushMessage(SendPushParam.PushMessageVO.builder().notification(SendPushParam.PushMessageVO.NotificationVO.builder() + .title(pushContentModel.getTitle()).body(pushContentModel.getContent()).clickType("startapp").build()) + .build()) + .build(); + if (CollUtil.isNotEmpty(cid)) { + param.setAudience(SendPushParam.AudienceVO.builder().cid(cid).build()); + } + return param; + } + +} diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/pending/Task.java b/austin-handler/src/main/java/com/java3y/austin/handler/pending/Task.java index 9190040..eb5f1af 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/pending/Task.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/pending/Task.java @@ -18,8 +18,9 @@ import org.springframework.stereotype.Component; /** * Task 执行器 * 0.丢弃消息 - * 1.通用去重功能 - * 2.发送消息 + * 2.屏蔽消息 + * 2.通用去重功能 + * 3.发送消息 * * @author 3y */ diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/script/MiniProgramAccountService.java b/austin-handler/src/main/java/com/java3y/austin/handler/script/MiniProgramAccountService.java new file mode 100644 index 0000000..7a4b6da --- /dev/null +++ b/austin-handler/src/main/java/com/java3y/austin/handler/script/MiniProgramAccountService.java @@ -0,0 +1,19 @@ +package com.java3y.austin.handler.script; + +import com.java3y.austin.handler.domain.wechat.WeChatMiniProgramParam; + +/** + * @author sunql + */ +public interface MiniProgramAccountService { + + /** + * 发送订阅消息 + * + * @param miniProgramParam 订阅消息参数 + * @return + * @throws Exception + */ + void send(WeChatMiniProgramParam miniProgramParam) throws Exception; + +} diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/script/OfficialAccountService.java b/austin-handler/src/main/java/com/java3y/austin/handler/script/OfficialAccountService.java index 868e76f..8451869 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/script/OfficialAccountService.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/script/OfficialAccountService.java @@ -1,5 +1,6 @@ package com.java3y.austin.handler.script; +import com.java3y.austin.handler.domain.wechat.WeChatOfficialParam; import me.chanjar.weixin.mp.bean.template.WxMpTemplateMessage; import java.util.List; @@ -12,10 +13,10 @@ public interface OfficialAccountService { /** * 发送模板消息 * - * @param wxMpTemplateMessages 模板消息列表 + * @param weChatOfficialParam 模板消息参数 * @return * @throws Exception */ - List send(List wxMpTemplateMessages) throws Exception; + List send(WeChatOfficialParam weChatOfficialParam) throws Exception; } diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/script/impl/MiniProgramAccountServiceImpl.java b/austin-handler/src/main/java/com/java3y/austin/handler/script/impl/MiniProgramAccountServiceImpl.java new file mode 100644 index 0000000..ddb2e21 --- /dev/null +++ b/austin-handler/src/main/java/com/java3y/austin/handler/script/impl/MiniProgramAccountServiceImpl.java @@ -0,0 +1,93 @@ +package com.java3y.austin.handler.script.impl; + +import cn.binarywang.wx.miniapp.api.WxMaService; +import cn.binarywang.wx.miniapp.api.WxMaSubscribeService; +import cn.binarywang.wx.miniapp.api.impl.WxMaServiceImpl; +import cn.binarywang.wx.miniapp.api.impl.WxMaSubscribeServiceImpl; +import cn.binarywang.wx.miniapp.bean.WxMaSubscribeMessage; +import cn.binarywang.wx.miniapp.config.impl.WxMaDefaultConfigImpl; +import com.java3y.austin.common.constant.SendAccountConstant; +import com.java3y.austin.common.dto.account.WeChatMiniProgramAccount; +import com.java3y.austin.handler.domain.wechat.WeChatMiniProgramParam; +import com.java3y.austin.handler.script.MiniProgramAccountService; +import com.java3y.austin.support.utils.AccountUtils; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * @author sunql + * @date 2022年05月06日 16:41 + */ +@Service +@Slf4j +public class MiniProgramAccountServiceImpl implements MiniProgramAccountService { + + @Autowired + private AccountUtils accountUtils; + + @Override + public void send(WeChatMiniProgramParam miniProgramParam) throws Exception { + WeChatMiniProgramAccount miniProgramAccount = accountUtils.getAccount(miniProgramParam.getSendAccount(), + SendAccountConstant.WECHAT_MINI_PROGRAM_ACCOUNT_KEY, + SendAccountConstant.WECHAT_MINI_PROGRAM_PREFIX, + WeChatMiniProgramAccount.class); + + WxMaSubscribeService wxMaSubscribeService = initService(miniProgramAccount); + List subscribeMessageList = assembleReq(miniProgramParam, miniProgramAccount); + for (WxMaSubscribeMessage subscribeMessage : subscribeMessageList) { + wxMaSubscribeService.sendSubscribeMsg(subscribeMessage); + } + } + + /** + * 组装发送模板信息参数 + */ + private List assembleReq(WeChatMiniProgramParam miniProgramParam, WeChatMiniProgramAccount miniProgramAccount) { + Set receiver = miniProgramParam.getOpenIds(); + List messageList = new ArrayList<>(receiver.size()); + + // 构建微信小程序订阅消息 + for (String openId : receiver) { + WxMaSubscribeMessage subscribeMessage = WxMaSubscribeMessage.builder() + .toUser(openId) + .data(getWxMTemplateData(miniProgramParam.getData())) + .miniprogramState(miniProgramAccount.getMiniProgramState()) + .templateId(miniProgramAccount.getTemplateId()) + .page(miniProgramAccount.getPage()) + .build(); + messageList.add(subscribeMessage); + } + return messageList; + } + + /** + * 构建订阅消息参数 + * + * @returnp + */ + private List getWxMTemplateData(Map data) { + List templateDataList = new ArrayList<>(data.size()); + data.forEach((k, v) -> templateDataList.add(new WxMaSubscribeMessage.MsgData(k, v))); + return templateDataList; + } + + /** + * 初始化微信小程序 + * + * @return + */ + private WxMaSubscribeServiceImpl initService(WeChatMiniProgramAccount miniProgramAccount) { + WxMaService wxMaService = new WxMaServiceImpl(); + WxMaDefaultConfigImpl wxMaConfig = new WxMaDefaultConfigImpl(); + wxMaConfig.setAppid(miniProgramAccount.getAppId()); + wxMaConfig.setSecret(miniProgramAccount.getAppSecret()); + wxMaService.setWxMaConfig(wxMaConfig); + return new WxMaSubscribeServiceImpl(wxMaService); + } +} diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/script/impl/OfficialAccountServiceImpl.java b/austin-handler/src/main/java/com/java3y/austin/handler/script/impl/OfficialAccountServiceImpl.java index fe69c4b..a0f68fc 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/script/impl/OfficialAccountServiceImpl.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/script/impl/OfficialAccountServiceImpl.java @@ -1,16 +1,23 @@ package com.java3y.austin.handler.script.impl; +import com.java3y.austin.common.constant.SendAccountConstant; +import com.java3y.austin.common.dto.account.WeChatOfficialAccount; +import com.java3y.austin.handler.domain.wechat.WeChatOfficialParam; import com.java3y.austin.handler.script.OfficialAccountService; +import com.java3y.austin.support.utils.AccountUtils; import lombok.extern.slf4j.Slf4j; import me.chanjar.weixin.mp.api.WxMpService; import me.chanjar.weixin.mp.api.impl.WxMpServiceImpl; +import me.chanjar.weixin.mp.bean.template.WxMpTemplateData; import me.chanjar.weixin.mp.bean.template.WxMpTemplateMessage; import me.chanjar.weixin.mp.config.impl.WxMpDefaultConfigImpl; -import org.springframework.beans.factory.annotation.Value; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.Set; /** * @author zyg @@ -19,19 +26,14 @@ import java.util.List; @Slf4j public class OfficialAccountServiceImpl implements OfficialAccountService { - @Value("${wx.mp.account.appid}") - private String appId; - @Value("${wx.mp.account.secret}") - private String secret; - @Value("${wx.mp.account.token}") - private String token; - @Value("${wx.mp.account.aesKey}") - private String aesKey; - + @Autowired + private AccountUtils accountUtils; @Override - public List send(List messages) throws Exception { - WxMpService wxMpService = initService(); + public List send(WeChatOfficialParam officialParam) throws Exception { + WeChatOfficialAccount officialAccount = accountUtils.getAccount(officialParam.getSendAccount(), SendAccountConstant.WECHAT_OFFICIAL_ACCOUNT_KEY, SendAccountConstant.WECHAT_OFFICIAL__PREFIX, WeChatOfficialAccount.class); + WxMpService wxMpService = initService(officialAccount); + List messages = assembleReq(officialParam, officialAccount); List messageIds = new ArrayList<>(messages.size()); for (WxMpTemplateMessage wxMpTemplateMessage : messages) { String msgId = wxMpService.getTemplateMsgService().sendTemplateMsg(wxMpTemplateMessage); @@ -40,18 +42,48 @@ public class OfficialAccountServiceImpl implements OfficialAccountService { return messageIds; } + /** + * 组装发送模板信息参数 + */ + private List assembleReq(WeChatOfficialParam officialParam, WeChatOfficialAccount officialAccount) { + Set receiver = officialParam.getOpenIds(); + List wxMpTemplateMessages = new ArrayList<>(receiver.size()); + + // 构建微信模板消息 + for (String openId : receiver) { + WxMpTemplateMessage templateMessage = WxMpTemplateMessage.builder() + .toUser(openId) + .templateId(officialAccount.getTemplateId()) + .url(officialAccount.getUrl()) + .data(getWxMpTemplateData(officialParam.getData())) + .miniProgram(new WxMpTemplateMessage.MiniProgram(officialAccount.getMiniProgramId(), officialAccount.getPath(), false)) + .build(); + wxMpTemplateMessages.add(templateMessage); + } + return wxMpTemplateMessages; + } + + /** + * 构建模板消息参数 + * + * @return + */ + private List getWxMpTemplateData(Map data) { + List templateDataList = new ArrayList<>(data.size()); + data.forEach((k, v) -> templateDataList.add(new WxMpTemplateData(k, v))); + return templateDataList; + } + /** * 初始化微信服务号 * * @return */ - public WxMpService initService() { + public WxMpService initService(WeChatOfficialAccount officialAccount) { WxMpService wxMpService = new WxMpServiceImpl(); WxMpDefaultConfigImpl config = new WxMpDefaultConfigImpl(); - config.setAppId(appId); - config.setSecret(secret); - config.setToken(token); - config.setAesKey(aesKey); + config.setAppId(officialAccount.getAppId()); + config.setSecret(officialAccount.getSecret()); wxMpService.setWxMpConfigStorage(config); return wxMpService; } diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/script/impl/TencentSmsScript.java b/austin-handler/src/main/java/com/java3y/austin/handler/script/impl/TencentSmsScript.java index 1a1201e..f3bb434 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/script/impl/TencentSmsScript.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/script/impl/TencentSmsScript.java @@ -44,7 +44,7 @@ public class TencentSmsScript implements SmsScript { @Override public List send(SmsParam smsParam) throws Exception { - TencentSmsAccount tencentSmsAccount = accountUtils.getAccount(smsParam.getSendAccount(), SendAccountConstant.SMS_ACCOUNT_KEY, SendAccountConstant.SMS_PREFIX, TencentSmsAccount.builder().build()); + TencentSmsAccount tencentSmsAccount = accountUtils.getAccount(smsParam.getSendAccount(), SendAccountConstant.SMS_ACCOUNT_KEY, SendAccountConstant.SMS_PREFIX, TencentSmsAccount.class); SmsClient client = init(tencentSmsAccount); SendSmsRequest request = assembleReq(smsParam, tencentSmsAccount); SendSmsResponse response = client.SendSms(request); diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/shield/impl/ShieldServiceImpl.java b/austin-handler/src/main/java/com/java3y/austin/handler/shield/impl/ShieldServiceImpl.java index 475fd1b..402b0ac 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/shield/impl/ShieldServiceImpl.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/shield/impl/ShieldServiceImpl.java @@ -11,10 +11,10 @@ import com.java3y.austin.handler.shield.ShieldService; import com.java3y.austin.support.utils.LogUtils; import com.java3y.austin.support.utils.RedisUtils; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.time.DateFormatUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import java.time.LocalDateTime; import java.util.Date; import java.util.HashSet; @@ -60,7 +60,7 @@ public class ShieldServiceImpl implements ShieldService { * @return */ private boolean isNight() { - return Integer.valueOf(DateFormatUtils.format(new Date(), "HH")) < 8; + return LocalDateTime.now().getHour() < 8; } diff --git a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/AfterParamCheckAction.java b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/AfterParamCheckAction.java index 39216ba..8c97488 100644 --- a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/AfterParamCheckAction.java +++ b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/AfterParamCheckAction.java @@ -13,6 +13,7 @@ import com.java3y.austin.service.api.impl.domain.SendTaskModel; import com.java3y.austin.support.pipeline.BusinessProcess; import com.java3y.austin.support.pipeline.ProcessContext; import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; import java.util.Iterator; import java.util.List; @@ -25,6 +26,7 @@ import java.util.stream.Collectors; * 后置参数检查 */ @Slf4j +@Service public class AfterParamCheckAction implements BusinessProcess { diff --git a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/AssembleAction.java b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/AssembleAction.java index 7114bc4..b69592d 100644 --- a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/AssembleAction.java +++ b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/AssembleAction.java @@ -21,6 +21,7 @@ import com.java3y.austin.support.utils.ContentHolderUtil; import com.java3y.austin.support.utils.TaskInfoUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; import java.lang.reflect.Field; import java.util.*; @@ -31,6 +32,7 @@ import java.util.*; * @description 拼装参数 */ @Slf4j +@Service public class AssembleAction implements BusinessProcess { @Autowired @@ -112,7 +114,8 @@ public class AssembleAction implements BusinessProcess { if (StrUtil.isNotBlank(originValue)) { String resultValue = ContentHolderUtil.replacePlaceHolder(originValue, variables); - ReflectUtil.setFieldValue(contentModel, field, resultValue); + Object resultObj = JSON.parseObject(resultValue, field.getType()); + ReflectUtil.setFieldValue(contentModel, field, resultObj); } } diff --git a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/PreParamCheckAction.java b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/PreParamCheckAction.java index 794e1d9..2c1d152 100644 --- a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/PreParamCheckAction.java +++ b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/PreParamCheckAction.java @@ -9,6 +9,7 @@ import com.java3y.austin.service.api.impl.domain.SendTaskModel; import com.java3y.austin.support.pipeline.BusinessProcess; import com.java3y.austin.support.pipeline.ProcessContext; import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; import java.util.List; import java.util.stream.Collectors; @@ -19,6 +20,7 @@ import java.util.stream.Collectors; * @description 前置参数校验 */ @Slf4j +@Service public class PreParamCheckAction implements BusinessProcess { @Override diff --git a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/SendMqAction.java b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/SendMqAction.java index 8bb27df..8a4ad69 100644 --- a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/SendMqAction.java +++ b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/SendMqAction.java @@ -13,12 +13,14 @@ import com.java3y.austin.support.utils.KafkaUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; /** * @author 3y * 将消息发送到MQ */ @Slf4j +@Service public class SendMqAction implements BusinessProcess { @Autowired diff --git a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/config/PipelineConfig.java b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/config/PipelineConfig.java index 5639735..662888b 100644 --- a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/config/PipelineConfig.java +++ b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/config/PipelineConfig.java @@ -6,15 +6,15 @@ import com.java3y.austin.service.api.impl.action.AfterParamCheckAction; import com.java3y.austin.service.api.impl.action.AssembleAction; import com.java3y.austin.service.api.impl.action.PreParamCheckAction; import com.java3y.austin.service.api.impl.action.SendMqAction; +import com.java3y.austin.service.api.impl.domain.SendTaskModel; import com.java3y.austin.support.pipeline.BusinessProcess; import com.java3y.austin.support.pipeline.ProcessController; import com.java3y.austin.support.pipeline.ProcessTemplate; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Map; +import java.util.*; /** * api层的pipeline配置类 @@ -23,6 +23,14 @@ import java.util.Map; @Configuration public class PipelineConfig { + @Autowired + private PreParamCheckAction preParamCheckAction; + @Autowired + private AssembleAction assembleAction; + @Autowired + private AfterParamCheckAction afterParamCheckAction; + @Autowired + private SendMqAction sendMqAction; /** * 普通发送执行流程 @@ -35,14 +43,8 @@ public class PipelineConfig { @Bean("commonSendTemplate") public ProcessTemplate commonSendTemplate() { ProcessTemplate processTemplate = new ProcessTemplate(); - ArrayList processList = new ArrayList<>(); - - processList.add(preParamCheckAction()); - processList.add(assembleAction()); - processList.add(afterParamCheckAction()); - processList.add(sendMqAction()); - - processTemplate.setProcessList(processList); + processTemplate.setProcessList(Arrays.asList(preParamCheckAction, assembleAction, + afterParamCheckAction, sendMqAction)); return processTemplate; } @@ -62,45 +64,4 @@ public class PipelineConfig { return processController; } - - /** - * 组装参数Action - * - * @return - */ - @Bean - public AssembleAction assembleAction() { - return new AssembleAction(); - } - - /** - * 前置参数校验Action - * - * @return - */ - @Bean - public PreParamCheckAction preParamCheckAction() { - return new PreParamCheckAction(); - } - - /** - * 后置参数校验Action - * - * @return - */ - @Bean - public AfterParamCheckAction afterParamCheckAction() { - return new AfterParamCheckAction(); - } - - /** - * 发送消息至MQ的Action - * - * @return - */ - @Bean - public SendMqAction sendMqAction() { - return new SendMqAction(); - } - } diff --git a/austin-service-api-impl/src/test/java/com/java3y/austin/service/api/impl/service/SendServiceImplTest.java b/austin-service-api-impl/src/test/java/com/java3y/austin/service/api/impl/service/SendServiceImplTest.java new file mode 100644 index 0000000..6e8e5cd --- /dev/null +++ b/austin-service-api-impl/src/test/java/com/java3y/austin/service/api/impl/service/SendServiceImplTest.java @@ -0,0 +1,95 @@ +package com.java3y.austin.service.api.impl.service; + +import com.java3y.austin.common.enums.RespStatusEnum; +import com.java3y.austin.common.vo.BasicResultVO; +import com.java3y.austin.service.api.domain.BatchSendRequest; +import com.java3y.austin.service.api.domain.MessageParam; +import com.java3y.austin.service.api.domain.SendRequest; +import com.java3y.austin.service.api.domain.SendResponse; +import com.java3y.austin.service.api.enums.BusinessCode; +import com.java3y.austin.service.api.impl.domain.SendTaskModel; +import com.java3y.austin.support.pipeline.BusinessProcess; +import com.java3y.austin.support.pipeline.ProcessContext; +import com.java3y.austin.support.pipeline.ProcessController; +import com.java3y.austin.support.pipeline.ProcessTemplate; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Spy; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class SendServiceImplTest { + + @Spy + private ProcessController processController; + + @Mock + private Map templateConfig; + + @Spy + private ProcessTemplate processTemplate; + + @Mock + private BusinessProcess businessProcess; + + @InjectMocks + private SendServiceImpl sendServiceImplUnderTest; + + @Test + void testSend() { + + // params + final SendRequest sendRequest = new SendRequest("send", 1L, + new MessageParam("13711111111", new HashMap<>(), new HashMap<>())); + + // predict result + final ProcessContext processContext = new ProcessContext<>(sendRequest.getCode(), new SendTaskModel(), false, new BasicResultVO<>( + RespStatusEnum.SUCCESS, "data")); + final SendResponse expectedResult = new SendResponse(processContext.getResponse().getStatus(), processContext.getResponse().getMsg()); + + + // stub + Map templateConfig = new HashMap<>(4); + processTemplate.setProcessList(Arrays.asList(businessProcess)); + templateConfig.put(BusinessCode.COMMON_SEND.getCode(), processTemplate); + + processController.setTemplateConfig(templateConfig); + + + // Run the test + final SendResponse result = sendServiceImplUnderTest.send(sendRequest); + + // Verify the results + assertEquals(expectedResult, result); + } + + @Test + void testBatchSend() { + // Setup + final BatchSendRequest batchSendRequest = new BatchSendRequest("code", 0L, + Arrays.asList(new MessageParam("receiver", new HashMap<>(), new HashMap<>()))); + final SendResponse expectedResult = new SendResponse("status", "msg"); + + // Configure ProcessController.process(...). + final ProcessContext processContext = new ProcessContext<>("code", null, false, new BasicResultVO<>( + RespStatusEnum.SUCCESS, "data")); + when(processController.process(new ProcessContext<>("code", null, false, new BasicResultVO<>( + RespStatusEnum.SUCCESS, "data")))).thenReturn(processContext); + + // Run the test + final SendResponse result = sendServiceImplUnderTest.batchSend(batchSendRequest); + + // Verify the results + assertEquals(expectedResult, result); + } +} diff --git a/austin-support/pom.xml b/austin-support/pom.xml index 6a9b651..96ae2e0 100644 --- a/austin-support/pom.xml +++ b/austin-support/pom.xml @@ -22,6 +22,11 @@ org.springframework.boot spring-boot-starter + + org.springframework.boot + spring-boot-starter-test + + mysql mysql-connector-java diff --git a/austin-support/src/main/java/com/java3y/austin/support/utils/AccountUtils.java b/austin-support/src/main/java/com/java3y/austin/support/utils/AccountUtils.java index 2d3792f..5d08a57 100644 --- a/austin-support/src/main/java/com/java3y/austin/support/utils/AccountUtils.java +++ b/austin-support/src/main/java/com/java3y/austin/support/utils/AccountUtils.java @@ -24,16 +24,19 @@ public class AccountUtils { * (key:smsAccount)短信参数示例:[{"sms_10":{"url":"sms.tencentcloudapi.com","region":"ap-guangzhou","secretId":"AKIDhDUUDfffffMEqBF1WljQq","secretKey":"B4h39yWnfffff7D2btue7JErDJ8gxyi","smsSdkAppId":"140025","templateId":"11897","signName":"Java3y公众号","supplierId":10,"supplierName":"腾讯云"}}] * (key:emailAccount)邮件参数示例:[{"email_10":{"host":"smtp.qq.com","port":465,"user":"403686131@qq.com","pass":"","from":"403686131@qq.com"}}] * (key:enterpriseWechatAccount)企业微信参数示例:[{"enterprise_wechat_10":{"corpId":"wwf87603333e00069c","corpSecret":"-IFWxS2222QxzPIorNVUQn144444D915DM","agentId":10044442,"token":"rXROB3333Kf6i","aesKey":"MKZtoFxHIM44444M7ieag3r9ZPUsl"}}] - * (key:dingDingRobotAccount) 钉钉自定义机器人参数实例:[{"ding_ding_robot_10":{"secret":"SEC996d8d9d4768aded74114faae924f229229de444475a1c295d64fedf","webhook":"https://oapi.dingtalk.com/robot/send?access_token=8d03b644ffb6534b203d87333367328b0c3003d164715d2c6c6e56"}}] + * (key:dingDingRobotAccount) 钉钉自定义机器人参数示例:[{"ding_ding_robot_10":{"secret":"SEC996d8d9d4768aded74114faae924f229229de444475a1c295d64fedf","webhook":"https://oapi.dingtalk.com/robot/send?access_token=8d03b644ffb6534b203d87333367328b0c3003d164715d2c6c6e56"}}] + * (key:dingDingWorkNoticeAccount) 钉钉工作消息参数示例:[{"ding_ding_work_notice_10":{"appKey":"dingh6yyyyyyycrlbx","appSecret":"tQpvmkR863333yyyyyHP3QHyyyymy9Ao1yoL1oQX5Nlx_fYLLLlpPJWHvWKbTu","agentId":"152333383622"}}] + * (key:officialAccount) 微信服务号模板消息参数示例:[{"official_10":{"appId":"wxecb4693d2eef1ea7","secret":"6240870f4d91701640d769ba20120821","templateId":"JHUk6eE9T5Ts7a5JO3ZQqkBBrZBGn5C9iIiKNDQsk-Q","url":"http://weixin.qq.com/download","miniProgramId":"xiaochengxuappid12345","path":"index?foo=bar"}}] + * (key:miniProgramAccount) 微信小程序订阅消息参数示例:[{"mini_program_10":{"appId":"wxecb4693d2eef1ea7","appSecret":"6240870f4d91701640d769ba20120821","templateId":"JHUk6eE9T5Ts7a5JO3ZQqkBBrZBGn5C9iIiKNDQsk-Q","grantType":"client_credential","miniProgramState":"trial","page":"index?foo=bar"}}] */ - public T getAccount(Integer sendAccount, String apolloKey, String prefix, T t) { + public T getAccount(Integer sendAccount, String apolloKey, String prefix, Class clazz) { String accountValues = config.getProperty(apolloKey, AustinConstant.APOLLO_DEFAULT_VALUE_JSON_ARRAY); JSONArray jsonArray = JSON.parseArray(accountValues); for (int i = 0; i < jsonArray.size(); i++) { JSONObject jsonObject = jsonArray.getJSONObject(i); - Object object = jsonObject.getObject(prefix + sendAccount, t.getClass()); + T object = jsonObject.getObject(prefix + sendAccount, clazz); if (object != null) { - return (T) object; + return object; } } return null; diff --git a/austin-support/src/main/java/com/java3y/austin/support/utils/RedisUtils.java b/austin-support/src/main/java/com/java3y/austin/support/utils/RedisUtils.java index 9d7f905..38d23b4 100644 --- a/austin-support/src/main/java/com/java3y/austin/support/utils/RedisUtils.java +++ b/austin-support/src/main/java/com/java3y/austin/support/utils/RedisUtils.java @@ -2,10 +2,12 @@ package com.java3y.austin.support.utils; import cn.hutool.core.collection.CollUtil; import com.google.common.base.Throwables; +import com.java3y.austin.common.constant.AustinConstant; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisCallback; import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.data.redis.core.script.RedisScript; import org.springframework.stereotype.Component; import java.util.HashMap; @@ -93,7 +95,6 @@ public class RedisUtils { /** * lpush 方法 并指定 过期时间 - * */ public void lPush(String key, String value, Long seconds) { try { @@ -109,7 +110,6 @@ public class RedisUtils { /** * lLen 方法 - * */ public Long lLen(String key) { try { @@ -119,9 +119,9 @@ public class RedisUtils { } return 0L; } + /** * lPop 方法 - * */ public String lPop(String key) { try { @@ -151,4 +151,33 @@ public class RedisUtils { log.error("redis pipelineSetEX fail! e:{}", Throwables.getStackTraceAsString(e)); } } + + /** + * 执行指定的lua脚本返回执行结果 + * --KEYS[1]: 限流 key + * --ARGV[1]: 限流窗口 + * --ARGV[2]: 当前时间戳(作为score) + * --ARGV[3]: 阈值 + * --ARGV[4]: score 对应的唯一value + * + * @param redisScript + * @param keys + * @param args + * @return + */ + public Boolean execLimitLua(RedisScript redisScript, List keys, String... args) { + + try { + Long execute = redisTemplate.execute(redisScript, keys, args); + + return AustinConstant.TRUE.equals(execute.intValue()); + } catch (Exception e) { + + log.error("redis execLimitLua fail! e:{}", Throwables.getStackTraceAsString(e)); + } + + return false; + } + + } diff --git a/austin-support/src/main/java/com/java3y/austin/support/utils/SnowFlakeIdUtils.java b/austin-support/src/main/java/com/java3y/austin/support/utils/SnowFlakeIdUtils.java new file mode 100644 index 0000000..925ea8b --- /dev/null +++ b/austin-support/src/main/java/com/java3y/austin/support/utils/SnowFlakeIdUtils.java @@ -0,0 +1,108 @@ +package com.java3y.austin.support.utils; + +/** + * 雪花算法生成唯一id工具类 + * + * @author cao + * @date 2022-04-20 13:12 + */ +public class SnowFlakeIdUtils { + + /** + * 初始时间截 (2017-01-01) + */ + private final static long START_TIMESTAMP = 1483200000000L; + + /** + * 每一部分占用的位数 + */ + private final static long SEQUENCE_BIT = 12; //***占用的位数 + private final static long MACHINE_BIT = 5; //机器标识占用的位数 + private final static long DATA_CENTER_BIT = 5; //数据中心占用的位数 + + /** + * 每一部分的最大值 + */ + private final static long MAX_SEQUENCE = -1L ^ (-1L << SEQUENCE_BIT); + private final static long MAX_MACHINE_NUM = -1L ^ (-1L << MACHINE_BIT); + private final static long MAX_DATA_CENTER_NUM = -1L ^ (-1L << DATA_CENTER_BIT); + + /** + * 每一部分向左的位移 + */ + private final static long MACHINE_LEFT = SEQUENCE_BIT; + private final static long DATA_CENTER_LEFT = SEQUENCE_BIT + MACHINE_BIT; + private final static long TIMESTAMP_LEFT = DATA_CENTER_LEFT + DATA_CENTER_BIT; + + private long dataCenterId; //数据中心 + private long machineId; //机器标识 + private long sequence = 0L; //*** + private long lastTimeStamp = -1L; //上一次时间戳 + + + /** + * 根据指定的数据中心ID和机器标志ID生成指定的*** + * + * @param dataCenterId 数据中心ID + * @param machineId 机器标志ID + */ + public SnowFlakeIdUtils(long dataCenterId, long machineId) { + if (dataCenterId > MAX_DATA_CENTER_NUM || dataCenterId < 0) { + throw new IllegalArgumentException(String.format("DtaCenterId 不能大于 %d 或小于 0", MAX_DATA_CENTER_NUM)); + } + if (machineId > MAX_MACHINE_NUM || machineId < 0) { + throw new IllegalArgumentException(String.format("MachineId 不能大于 %d 或小于 0", MAX_MACHINE_NUM)); + } + this.dataCenterId = dataCenterId; + this.machineId = machineId; + } + + + /** + * 产生下一个ID + * + * @return + */ + public synchronized long nextId() { + long currTimeStamp = System.currentTimeMillis(); + if (currTimeStamp < lastTimeStamp) { + throw new RuntimeException("当前时间小于上一次记录的时间戳!"); + } + + if (currTimeStamp == lastTimeStamp) { + //相同毫秒内,***自增 + sequence = (sequence + 1) & MAX_SEQUENCE; + //同一毫秒的序列数已经达到最大 + if (sequence == 0L) { + currTimeStamp = getNextMill(); + } + } else { + //不同毫秒内,***置为0 + sequence = 0L; + } + + lastTimeStamp = currTimeStamp; + + return (currTimeStamp - START_TIMESTAMP) << TIMESTAMP_LEFT //时间戳部分 + | dataCenterId << DATA_CENTER_LEFT //数据中心部分 + | machineId << MACHINE_LEFT //机器标识部分 + | sequence; //***部分 + } + + + /** + * 阻塞到下一个毫秒,直到获得新的时间戳 + * + * @return 当前时间戳 + */ + private long getNextMill() { + + long mill = System.currentTimeMillis(); + while (mill <= lastTimeStamp) { + mill = System.currentTimeMillis(); + } + return mill; + + } + +} diff --git a/austin-web/src/main/resources/application.properties b/austin-web/src/main/resources/application.properties index 103494b..ef0bf5f 100644 --- a/austin-web/src/main/resources/application.properties +++ b/austin-web/src/main/resources/application.properties @@ -87,10 +87,3 @@ management.endpoint.metrics.enabled=true management.endpoint.prometheus.enabled=true management.endpoints.web.exposure.include=* management.metrics.export.prometheus.enabled=true - -##################### wx mp config ##################### -##################### TODO not test by 3y,wait to apply for OfficialAccount ##################### -wx.mp.account.appid="appid" -wx.mp.account.secret="secret" -wx.mp.account.token="token" -wx.mp.account.aesKey="aesKey" \ No newline at end of file diff --git a/austin-web/src/main/resources/limit.lua b/austin-web/src/main/resources/limit.lua new file mode 100644 index 0000000..48b91ec --- /dev/null +++ b/austin-web/src/main/resources/limit.lua @@ -0,0 +1,17 @@ +--KEYS[1]: 限流 key +--ARGV[1]: 限流窗口,毫秒 +--ARGV[2]: 当前时间戳(作为score) +--ARGV[3]: 阈值 +--ARGV[4]: score 对应的唯一value +-- 1\. 移除开始时间窗口之前的数据 +redis.call('zremrangeByScore', KEYS[1], 0, ARGV[2]-ARGV[1]) +-- 2\. 统计当前元素数量 +local res = redis.call('zcard', KEYS[1]) +-- 3\. 是否超过阈值 +if (res == nil) or (res < tonumber(ARGV[3])) then + redis.call('zadd', KEYS[1], ARGV[2], ARGV[4]) + redis.call('expire', KEYS[1], ARGV[1]/1000) + return 0 +else + return 1 +end diff --git a/pom.xml b/pom.xml index 6388c08..68c2ffa 100644 --- a/pom.xml +++ b/pom.xml @@ -160,6 +160,13 @@ ${weixin-java} + + + com.github.binarywang + weixin-java-miniapp + ${weixin-java} + + io.github.lyh200