Merge branch 'master' into vip

pull/9/head
3y 2 years ago
commit f2cd7532a9

@ -122,18 +122,17 @@ curl -XPOST "127.0.0.1:8080/send" -H 'Content-Type: application/json' -d '{"co
- [x] 企业微信渠道接入
- [x] 夜间屏蔽次日早晨推送xxl-job定时任务框架另类的延时队列
- [x] 钉钉渠道接入
- [ ] 编写单测
- [ ] 持续提高消息推送系统的影响力,让更多的业务方了解其功能,进而挖掘更多拉新和唤醒用户的玩法,提高站内的次留率和转化率
- [ ] 优化代码
- [ ] 接入微信服务号渠道
- [ ] 接入微信小程序渠道
- [ ] 接入PUSH渠道
- [ ] 接入工作流引擎实现对消息工单审核
- [x] 单机限流实现
- [x] 引入单测框架,编写部分单测用例
- [x] 接入微信服务号渠道(已有pull request代码)
- [x] 接入微信小程序渠道(已有pull request代码)
- [x] 接入PUSH渠道
- [ ] 总体架构已完成,持续做基础建设和优化代码
**近期更新时间**2022年3月30日
**近期更新时间**5月9号
**近期更新功能**钉钉群自定义机器人与工作消息渠道接入完成
**近期更新功能**接入个推PUSH安卓发送推送消息
## 项目交流

@ -12,6 +12,13 @@ package com.java3y.austin.common.constant;
*/
public class SendAccountConstant {
/**
* 1010
*/
public static final Integer START = 10;
public static final Integer STEP = 10;
/**
*
*/
@ -19,6 +26,14 @@ public class SendAccountConstant {
public static final String DING_DING_WORK_NOTICE_PREFIX = "ding_ding_work_notice_";
public static final String DING_DING_ACCESS_TOKEN_PREFIX = "ding_ding_access_token_";
/**
* PUSH
*/
public static final String GE_TUI_ACCOUNT_KEY = "geTuiAccount";
public static final String GE_TUI_ACCOUNT_PREFIX = "ge_tui_account_";
public static final String GE_TUI_ACCESS_TOKEN_PREFIX = "ge_tui_access_token_";
/**
*
@ -39,6 +54,17 @@ public class SendAccountConstant {
public static final String ENTERPRISE_WECHAT_ACCOUNT_KEY = "enterpriseWechatAccount";
public static final String ENTERPRISE_WECHAT_PREFIX = "enterprise_wechat_";
/**
*
*/
public static final String WECHAT_OFFICIAL_ACCOUNT_KEY = "officialAccount";
public static final String WECHAT_OFFICIAL__PREFIX = "official_";
/**
*
*/
public static final String WECHAT_MINI_PROGRAM_ACCOUNT_KEY = "miniProgramAccount";
public static final String WECHAT_MINI_PROGRAM_PREFIX = "mini_program_";
/**
*
@ -46,9 +72,5 @@ public class SendAccountConstant {
public static final String SMS_ACCOUNT_KEY = "smsAccount";
public static final String SMS_PREFIX = "sms_";
/**
* 1010
*/
public static final Integer START = 10;
public static final Integer STEP = 10;
}

@ -0,0 +1,30 @@
package com.java3y.austin.common.dto.account;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
*
*
* @author 3y
* <p>
* api
* <p>
* https://docs.getui.com/getui/start/devcenter/
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class GeTuiAccount {
private String appId;
private String appKey;
private String masterSecret;
}

@ -0,0 +1,44 @@
package com.java3y.austin.common.dto.account;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
*
*
* <p>
*
* https://developers.weixin.qq.com/miniprogram/dev/api-backend/open-api/subscribe-message/subscribeMessage.send.html
* * @author sunql
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class WeChatMiniProgramAccount {
/**
* ID
*/
private String templateId;
/**
* developertrialformal
*/
private String miniProgramState;
/**
* ,index?foo=bar
*/
private String page;
/**
*
*/
private String appId;
private String appSecret;
private String grantType;
}

@ -19,17 +19,7 @@ import java.util.Map;
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class WechatOfficialAccount {
/**
* openId
*/
private String openId;
/**
* 使Id
*/
private String templateId;
public class WeChatOfficialAccount {
/**
* url
@ -47,7 +37,9 @@ public class WechatOfficialAccount {
private String path;
/**
*
*
*/
private Map<String, String> map;
private String appId;
private String secret;
private String templateId;
}

@ -1,8 +1,22 @@
package com.java3y.austin.common.dto.model;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Map;
/**
* @author 3y
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class MiniProgramContentModel extends ContentModel {
/**
*
*/
Map<String, String> map;
}

@ -1,9 +1,23 @@
package com.java3y.austin.common.dto.model;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author 3y
*
*
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class PushContentModel extends ContentModel {
private String title;
private String content;
private String url;
}

@ -0,0 +1,37 @@
package com.java3y.austin.cron.dto.getui;
import com.alibaba.fastjson.annotation.JSONField;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author 3y
* @date 2022/5/8
* <p>
* https://docs.getui.com/getui/server/rest_v2/token/
*/
@NoArgsConstructor
@Data
@AllArgsConstructor
@Builder
public class GeTuiTokenResultDTO {
@JSONField(name = "msg")
private String msg;
@JSONField(name = "code")
private Integer code;
@JSONField(name = "data")
private DataDTO data;
@NoArgsConstructor
@Data
public static class DataDTO {
@JSONField(name = "expire_time")
private String expireTime;
@JSONField(name = "token")
private String token;
}
}

@ -0,0 +1,40 @@
package com.java3y.austin.cron.dto.getui;
import cn.hutool.crypto.SecureUtil;
import cn.hutool.http.ContentType;
import cn.hutool.http.Header;
import cn.hutool.http.HttpRequest;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.annotation.JSONField;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* token
* @author 3y
* https://docs.getui.com/getui/server/rest_v2/token/
*/
@NoArgsConstructor
@Data
@Builder
@AllArgsConstructor
public class QueryTokenParamDTO {
/**
* sign
*/
@JSONField(name = "sign")
private String sign;
/**
* timestamp
*/
@JSONField(name = "timestamp")
private String timestamp;
/**
* appkey
*/
@JSONField(name = "appkey")
private String appKey;
}

@ -46,7 +46,7 @@ public class RefreshDingDingAccessTokenHandler {
log.info("refreshAccessTokenJob#execute!");
SupportThreadPoolConfig.getPendingSingleThreadPool().execute(() -> {
for (int index = SendAccountConstant.START; true; index = index + SendAccountConstant.STEP) {
DingDingWorkNoticeAccount account = accountUtils.getAccount(index, SendAccountConstant.DING_DING_WORK_NOTICE_ACCOUNT_KEY, SendAccountConstant.DING_DING_WORK_NOTICE_PREFIX, new DingDingWorkNoticeAccount());
DingDingWorkNoticeAccount account = accountUtils.getAccount(index, SendAccountConstant.DING_DING_WORK_NOTICE_ACCOUNT_KEY, SendAccountConstant.DING_DING_WORK_NOTICE_PREFIX, DingDingWorkNoticeAccount.class);
if (account == null) {
break;
}

@ -0,0 +1,91 @@
package com.java3y.austin.cron.handler;
import cn.hutool.core.util.StrUtil;
import cn.hutool.crypto.SecureUtil;
import cn.hutool.http.ContentType;
import cn.hutool.http.Header;
import cn.hutool.http.HttpRequest;
import com.alibaba.fastjson.JSON;
import com.google.common.base.Throwables;
import com.java3y.austin.common.constant.SendAccountConstant;
import com.java3y.austin.common.dto.account.GeTuiAccount;
import com.java3y.austin.cron.dto.getui.QueryTokenParamDTO;
import com.java3y.austin.cron.dto.getui.GeTuiTokenResultDTO;
import com.java3y.austin.support.config.SupportThreadPoolConfig;
import com.java3y.austin.support.utils.AccountUtils;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
/**
* token
* <p>
* https://docs.getui.com/getui/server/rest_v2/token/
*
* @author 3y
*/
@Service
@Slf4j
public class RefreshGeTuiAccessTokenHandler {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private AccountUtils accountUtils;
/**
* )
*/
@XxlJob("refreshGeTuiAccessTokenJob")
public void execute() {
log.info("refreshGeTuiAccessTokenJob#execute!");
SupportThreadPoolConfig.getPendingSingleThreadPool().execute(() -> {
for (int index = SendAccountConstant.START; true; index = index + SendAccountConstant.STEP) {
GeTuiAccount account = accountUtils.getAccount(index, SendAccountConstant.GE_TUI_ACCOUNT_KEY, SendAccountConstant.GE_TUI_ACCOUNT_PREFIX, GeTuiAccount.class);
if (account == null) {
break;
}
String accessToken = getAccessToken(account);
if (StrUtil.isNotBlank(accessToken)) {
redisTemplate.opsForValue().set(SendAccountConstant.GE_TUI_ACCESS_TOKEN_PREFIX + index, accessToken);
}
}
});
}
/**
* access_token
*
* @param account
* @return
*/
private String getAccessToken(GeTuiAccount account) {
String accessToken = "";
try {
String url = "https://restapi.getui.com/v2/" + account.getAppId() + "/auth";
String time = String.valueOf(System.currentTimeMillis());
String digest = SecureUtil.sha256().digestHex(account.getAppKey() + time + account.getMasterSecret());
QueryTokenParamDTO param = QueryTokenParamDTO.builder()
.timestamp(time)
.appKey(account.getAppKey())
.sign(digest).build();
String body = HttpRequest.post(url).header(Header.CONTENT_TYPE.getValue(), ContentType.JSON.getValue())
.body(JSON.toJSONString(param))
.timeout(20000)
.execute().body();
GeTuiTokenResultDTO geTuiTokenResultDTO = JSON.parseObject(body, GeTuiTokenResultDTO.class);
if (geTuiTokenResultDTO.getCode().equals(0)) {
accessToken = geTuiTokenResultDTO.getData().getToken();
}
} catch (Exception e) {
log.error("RefreshGeTuiAccessTokenHandler#getAccessToken fail:{}", Throwables.getStackTraceAsString(e));
}
return accessToken;
}
}

@ -46,6 +46,11 @@
<groupId>com.github.binarywang</groupId>
<artifactId>weixin-java-mp</artifactId>
</dependency>
<!--微信小程序第三方SDK-->
<dependency>
<groupId>com.github.binarywang</groupId>
<artifactId>weixin-java-miniapp</artifactId>
</dependency>
<!--企业微信发送消息-->
<dependency>
@ -54,4 +59,4 @@
</dependency>
</dependencies>
</project>
</project>

@ -1,4 +1,4 @@
package com.java3y.austin.handler.domain;
package com.java3y.austin.handler.deduplication;
import com.alibaba.fastjson.annotation.JSONField;
import com.java3y.austin.common.domain.TaskInfo;

@ -5,7 +5,6 @@ import com.ctrip.framework.apollo.spring.annotation.ApolloConfig;
import com.java3y.austin.common.constant.AustinConstant;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.common.enums.DeduplicationType;
import com.java3y.austin.handler.domain.DeduplicationParam;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@ -3,7 +3,7 @@ package com.java3y.austin.handler.deduplication.build;
import com.alibaba.fastjson.JSONObject;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.handler.deduplication.DeduplicationHolder;
import com.java3y.austin.handler.domain.DeduplicationParam;
import com.java3y.austin.handler.deduplication.DeduplicationParam;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.PostConstruct;

@ -1,7 +1,7 @@
package com.java3y.austin.handler.deduplication.build;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.handler.domain.DeduplicationParam;
import com.java3y.austin.handler.deduplication.DeduplicationParam;
/**
* @author luohaojie

@ -3,7 +3,7 @@ package com.java3y.austin.handler.deduplication.build;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.common.enums.AnchorState;
import com.java3y.austin.common.enums.DeduplicationType;
import com.java3y.austin.handler.domain.DeduplicationParam;
import com.java3y.austin.handler.deduplication.DeduplicationParam;
import org.springframework.stereotype.Service;

@ -4,7 +4,7 @@ import cn.hutool.core.date.DateUtil;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.common.enums.AnchorState;
import com.java3y.austin.common.enums.DeduplicationType;
import com.java3y.austin.handler.domain.DeduplicationParam;
import com.java3y.austin.handler.deduplication.DeduplicationParam;
import org.springframework.stereotype.Service;
import java.util.Date;

@ -0,0 +1,36 @@
package com.java3y.austin.handler.deduplication.limit;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.handler.deduplication.service.AbstractDeduplicationService;
import java.util.*;
/**
* @author cao
* @date 2022-04-20 12:00
*/
public abstract class AbstractLimitService implements LimitService {
/**
* Key
*
* @param taskInfo
* @return
*/
protected List<String> deduplicationAllKey(AbstractDeduplicationService service, TaskInfo taskInfo) {
List<String> result = new ArrayList<>(taskInfo.getReceiver().size());
for (String receiver : taskInfo.getReceiver()) {
String key = deduplicationSingleKey(service, taskInfo, receiver);
result.add(key);
}
return result;
}
protected String deduplicationSingleKey(AbstractDeduplicationService service, TaskInfo taskInfo, String receiver) {
return service.deduplicationSingleKey(taskInfo, receiver);
}
}

@ -0,0 +1,25 @@
package com.java3y.austin.handler.deduplication.limit;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.handler.deduplication.DeduplicationParam;
import com.java3y.austin.handler.deduplication.service.AbstractDeduplicationService;
import java.util.Set;
/**
* @author cao
* @date 2022-04-20 11:58
*/
public interface LimitService {
/**
*
* @param service
* @param taskInfo
* @param param
* @return
*/
Set<String> limitFilter(AbstractDeduplicationService service, TaskInfo taskInfo, DeduplicationParam param);
}

@ -0,0 +1,77 @@
package com.java3y.austin.handler.deduplication.limit;
import cn.hutool.core.collection.CollUtil;
import com.java3y.austin.common.constant.AustinConstant;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.handler.deduplication.DeduplicationParam;
import com.java3y.austin.handler.deduplication.service.AbstractDeduplicationService;
import com.java3y.austin.support.utils.RedisUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.*;
import java.util.stream.Collectors;
/**
* N
* @author cao
* @date 2022-04-20 13:41
*/
@Service(value = "SimpleLimitService")
public class SimpleLimitService extends AbstractLimitService {
private static final String LIMIT_TAG = "SP_";
@Autowired
private RedisUtils redisUtils;
@Override
public Set<String> limitFilter(AbstractDeduplicationService service, TaskInfo taskInfo, DeduplicationParam param) {
Set<String> filterReceiver = new HashSet<>(taskInfo.getReceiver().size());
// 获取redis记录
Map<String, String> readyPutRedisReceiver = new HashMap(taskInfo.getReceiver().size());
//redis数据隔离
List<String> keys = deduplicationAllKey(service, taskInfo).stream().map(key -> LIMIT_TAG + key).collect(Collectors.toList());
Map<String, String> inRedisValue = redisUtils.mGet(keys);
for (String receiver : taskInfo.getReceiver()) {
String key = LIMIT_TAG + deduplicationSingleKey(service, taskInfo, receiver);
String value = inRedisValue.get(key);
// 符合条件的用户
if (value != null && Integer.parseInt(value) >= param.getCountNum()) {
filterReceiver.add(receiver);
} else {
readyPutRedisReceiver.put(receiver, key);
}
}
// 不符合条件的用户需要更新Redis(无记录添加,有记录则累加次数)
putInRedis(readyPutRedisReceiver, inRedisValue, param.getDeduplicationTime());
return filterReceiver;
}
/**
* redis
*
* @param readyPutRedisReceiver
*/
private void putInRedis(Map<String, String> readyPutRedisReceiver,
Map<String, String> inRedisValue, Long deduplicationTime) {
Map<String, String> keyValues = new HashMap<>(readyPutRedisReceiver.size());
for (Map.Entry<String, String> entry : readyPutRedisReceiver.entrySet()) {
String key = entry.getValue();
if (inRedisValue.get(key) != null) {
keyValues.put(key, String.valueOf(Integer.valueOf(inRedisValue.get(key)) + 1));
} else {
keyValues.put(key, String.valueOf(AustinConstant.TRUE));
}
}
if (CollUtil.isNotEmpty(keyValues)) {
redisUtils.pipelineSetEx(keyValues, deduplicationTime);
}
}
}

@ -0,0 +1,69 @@
package com.java3y.austin.handler.deduplication.limit;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.handler.deduplication.DeduplicationParam;
import com.java3y.austin.handler.deduplication.service.AbstractDeduplicationService;
import com.java3y.austin.support.utils.RedisUtils;
import com.java3y.austin.support.utils.SnowFlakeIdUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scripting.support.ResourceScriptSource;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
/**
* N
* @author cao
* @date 2022-04-20 11:34
*/
@Service(value = "SlideWindowLimitService")
public class SlideWindowLimitService extends AbstractLimitService {
private static final String LIMIT_TAG = "SW_";
@Autowired
private RedisUtils redisUtils;
private SnowFlakeIdUtils snowFlakeIdUtils = new SnowFlakeIdUtils(1, 1);
private DefaultRedisScript<Long> redisScript;
@PostConstruct
public void init() {
redisScript = new DefaultRedisScript();
redisScript.setResultType(Long.class);
redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("limit.lua")));
}
/**
* @param service
* @param taskInfo
* @param param
* @return
*/
@Override
public Set<String> limitFilter(AbstractDeduplicationService service, TaskInfo taskInfo, DeduplicationParam param) {
Set<String> filterReceiver = new HashSet<>(taskInfo.getReceiver().size());
long nowTime = System.currentTimeMillis();
for (String receiver : taskInfo.getReceiver()) {
String key = LIMIT_TAG + deduplicationSingleKey(service, taskInfo, receiver);
String scoreValue = String.valueOf(snowFlakeIdUtils.nextId());
String score = String.valueOf(nowTime);
if (redisUtils.execLimitLua(redisScript, Arrays.asList(key), String.valueOf(param.getDeduplicationTime() * 1000), score, String.valueOf(param.getCountNum()), scoreValue)) {
filterReceiver.add(receiver);
}
}
return filterReceiver;
}
}

@ -1,13 +1,12 @@
package com.java3y.austin.handler.deduplication.service;
import cn.hutool.core.collection.CollUtil;
import com.java3y.austin.common.constant.AustinConstant;
import com.java3y.austin.common.domain.AnchorInfo;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.handler.deduplication.DeduplicationHolder;
import com.java3y.austin.handler.domain.DeduplicationParam;
import com.java3y.austin.handler.deduplication.DeduplicationParam;
import com.java3y.austin.handler.deduplication.limit.LimitService;
import com.java3y.austin.support.utils.LogUtils;
import com.java3y.austin.support.utils.RedisUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@ -21,8 +20,11 @@ import java.util.*;
*/
@Slf4j
public abstract class AbstractDeduplicationService implements DeduplicationService {
protected Integer deduplicationType;
protected LimitService limitService;
@Autowired
private DeduplicationHolder deduplicationHolder;
@ -31,8 +33,6 @@ public abstract class AbstractDeduplicationService implements DeduplicationServi
deduplicationHolder.putService(deduplicationType, this);
}
@Autowired
private RedisUtils redisUtils;
@Autowired
private LogUtils logUtils;
@ -40,27 +40,8 @@ public abstract class AbstractDeduplicationService implements DeduplicationServi
@Override
public void deduplication(DeduplicationParam param) {
TaskInfo taskInfo = param.getTaskInfo();
Set<String> filterReceiver = new HashSet<>(taskInfo.getReceiver().size());
// 获取redis记录
Set<String> readyPutRedisReceiver = new HashSet<>(taskInfo.getReceiver().size());
List<String> keys = deduplicationAllKey(taskInfo);
Map<String, String> inRedisValue = redisUtils.mGet(keys);
for (String receiver : taskInfo.getReceiver()) {
String key = deduplicationSingleKey(taskInfo, receiver);
String value = inRedisValue.get(key);
// 符合条件的用户
if (value != null && Integer.parseInt(value) >= param.getCountNum()) {
filterReceiver.add(receiver);
} else {
readyPutRedisReceiver.add(receiver);
}
}
// 不符合条件的用户需要更新Redis(无记录添加,有记录则累加次数)
putInRedis(readyPutRedisReceiver, inRedisValue, param);
Set<String> filterReceiver = limitService.limitFilter(this, taskInfo, param);
// 剔除符合去重条件的用户
if (CollUtil.isNotEmpty(filterReceiver)) {
@ -77,44 +58,7 @@ public abstract class AbstractDeduplicationService implements DeduplicationServi
* @param receiver
* @return
*/
protected abstract String deduplicationSingleKey(TaskInfo taskInfo, String receiver);
/**
* redis
*
* @param readyPutRedisReceiver
*/
private void putInRedis(Set<String> readyPutRedisReceiver,
Map<String, String> inRedisValue, DeduplicationParam param) {
Map<String, String> keyValues = new HashMap<>(readyPutRedisReceiver.size());
for (String receiver : readyPutRedisReceiver) {
String key = deduplicationSingleKey(param.getTaskInfo(), receiver);
if (inRedisValue.get(key) != null) {
keyValues.put(key, String.valueOf(Integer.valueOf(inRedisValue.get(key)) + 1));
} else {
keyValues.put(key, String.valueOf(AustinConstant.TRUE));
}
}
if (CollUtil.isNotEmpty(keyValues)) {
redisUtils.pipelineSetEx(keyValues, param.getDeduplicationTime());
}
}
/**
* Key
*
* @param taskInfo
* @return
*/
private List<String> deduplicationAllKey(TaskInfo taskInfo) {
List<String> result = new ArrayList<>(taskInfo.getReceiver().size());
for (String receiver : taskInfo.getReceiver()) {
String key = deduplicationSingleKey(taskInfo, receiver);
result.add(key);
}
return result;
}
public abstract String deduplicationSingleKey(TaskInfo taskInfo, String receiver);
}

@ -4,6 +4,9 @@ import cn.hutool.crypto.digest.DigestUtil;
import com.alibaba.fastjson.JSON;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.common.enums.DeduplicationType;
import com.java3y.austin.handler.deduplication.limit.LimitService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
/**
@ -14,7 +17,10 @@ import org.springframework.stereotype.Service;
@Service
public class ContentDeduplicationService extends AbstractDeduplicationService {
public ContentDeduplicationService() {
@Autowired
public ContentDeduplicationService(@Qualifier("SlideWindowLimitService") LimitService limitService) {
this.limitService = limitService;
deduplicationType = DeduplicationType.CONTENT.getCode();
}

@ -1,7 +1,7 @@
package com.java3y.austin.handler.deduplication.service;
import com.java3y.austin.handler.domain.DeduplicationParam;
import com.java3y.austin.handler.deduplication.DeduplicationParam;
/**
* @author huskey

@ -3,8 +3,12 @@ package com.java3y.austin.handler.deduplication.service;
import cn.hutool.core.util.StrUtil;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.common.enums.DeduplicationType;
import com.java3y.austin.handler.deduplication.limit.LimitService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
/**
* @author 3y
* @date 2021/12/12
@ -13,8 +17,13 @@ import org.springframework.stereotype.Service;
@Service
public class FrequencyDeduplicationService extends AbstractDeduplicationService {
public FrequencyDeduplicationService() {
@Autowired
public FrequencyDeduplicationService(@Qualifier("SimpleLimitService") LimitService limitService) {
this.limitService = limitService;
deduplicationType = DeduplicationType.FREQUENCY.getCode();
}
private static final String PREFIX = "FRE";
@ -33,7 +42,7 @@ public class FrequencyDeduplicationService extends AbstractDeduplicationService
@Override
public String deduplicationSingleKey(TaskInfo taskInfo, String receiver) {
return PREFIX + StrUtil.C_UNDERLINE
+ receiver + StrUtil.C_UNDERLINE
+ receiver + StrUtil.C_UNDERLINE
+ taskInfo.getMessageTemplateId() + StrUtil.C_UNDERLINE
+ taskInfo.getSendChannel();
}

@ -0,0 +1,33 @@
package com.java3y.austin.handler.domain.push;
import com.java3y.austin.common.domain.TaskInfo;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Set;
@NoArgsConstructor
@AllArgsConstructor
@Data
@Builder
public class PushParam {
/**
* token
*/
private String token;
/**
* appId
*/
private String appId;
/**
*
*/
private TaskInfo taskInfo;
}

@ -0,0 +1,54 @@
package com.java3y.austin.handler.domain.push.getui;
import com.alibaba.fastjson.annotation.JSONField;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Set;
/**
* param
*
* @author 3y
* https://docs.getui.com/getui/server/rest_v2/push/
*/
@NoArgsConstructor
@AllArgsConstructor
@Data
@Builder
public class BatchSendPushParam {
/**
* audience
*/
@JSONField(name = "audience")
private AudienceVO audience;
/**
* taskid
*/
@JSONField(name = "taskid")
private String taskId;
/**
* isAsync
*/
@JSONField(name = "is_async")
private Boolean isAsync;
/**
* AudienceVO
*/
@NoArgsConstructor
@Data
@Builder
@AllArgsConstructor
public static class AudienceVO {
/**
* cid
*/
@JSONField(name = "cid")
private Set<String> cid;
}
}

@ -0,0 +1,115 @@
package com.java3y.austin.handler.domain.push.getui;
import com.alibaba.fastjson.annotation.JSONField;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Set;
/**
* param
* @author 3y
* https://docs.getui.com/getui/server/rest_v2/push/
*/
@NoArgsConstructor
@AllArgsConstructor
@Data
@Builder
public class SendPushParam {
/**
* requestId
*/
@JSONField(name = "request_id")
private String requestId;
/**
* settings
*/
@JSONField(name = "settings")
private SettingsVO settings;
/**
* audience
*/
@JSONField(name = "audience")
private AudienceVO audience;
/**
* pushMessage
*/
@JSONField(name = "push_message")
private PushMessageVO pushMessage;
/**
* SettingsVO
*/
@NoArgsConstructor
@Data
public static class SettingsVO {
/**
* ttl
*/
@JSONField(name = "ttl")
private Integer ttl;
}
/**
* AudienceVO
*/
@NoArgsConstructor
@Data
@AllArgsConstructor
@Builder
public static class AudienceVO {
/**
* cid
*/
@JSONField(name = "cid")
private Set<String> cid;
}
/**
* PushMessageVO
*/
@NoArgsConstructor
@Data
@AllArgsConstructor
@Builder
public static class PushMessageVO {
/**
* notification
*/
@JSONField(name = "notification")
private NotificationVO notification;
/**
* NotificationVO
*/
@NoArgsConstructor
@Data
@AllArgsConstructor
@Builder
public static class NotificationVO {
/**
* title
*/
@JSONField(name = "title")
private String title;
/**
* body
*/
@JSONField(name = "body")
private String body;
/**
* clickType
*/
@JSONField(name = "click_type")
private String clickType;
/**
* url
*/
@JSONField(name = "url")
private String url;
}
}
}

@ -0,0 +1,38 @@
package com.java3y.austin.handler.domain.push.getui;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.annotation.JSONField;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
*
* @author 3y
* https://docs.getui.com/getui/server/rest_v2/common_args/?id=doc-title-1
*/
@Builder
@Data
@AllArgsConstructor
@NoArgsConstructor
public class SendPushResult {
/**
* msg
*/
@JSONField(name = "msg")
private String msg;
/**
* code
*/
@JSONField(name = "code")
private Integer code;
/**
* data
*/
@JSONField(name = "data")
private JSONObject data;
}

@ -0,0 +1,38 @@
package com.java3y.austin.handler.domain.wechat;
import lombok.Builder;
import lombok.Data;
import java.util.Map;
import java.util.Set;
/**
* @author sunql
* @date 20220506 15:56
*
*
*/
@Data
@Builder
public class WeChatMiniProgramParam {
/**
* Id
*/
private Long messageTemplateId;
/**
*
*/
private Integer sendAccount;
/**
* openid
*/
private Set<String> openIds;
/**
* { "key1": { "value": any }, "key2": { "value": any } }
*/
private Map<String, String> data;
}

@ -0,0 +1,37 @@
package com.java3y.austin.handler.domain.wechat;
import lombok.Builder;
import lombok.Data;
import java.util.Map;
import java.util.Set;
/**
* @author sunql
* @date 20220506 9:56
*
*
*/
@Data
@Builder
public class WeChatOfficialParam {
/**
* Id
*/
private Long messageTemplateId;
/**
*
*/
private Set<String> openIds;
/**
*
*/
private Map<String, String> data;
/**
*
*/
private Integer sendAccount;
}

@ -0,0 +1,26 @@
package com.java3y.austin.handler.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.ToString;
/**
*
*
* @author 3y
*/
@Getter
@ToString
@AllArgsConstructor
public enum RateLimitStrategy {
REQUEST_RATE_LIMIT(10, "根据真实请求数限流"),
SEND_USER_NUM_RATE_LIMIT(20, "根据发送用户数限流"),
;
private Integer code;
private String description;
}

@ -0,0 +1,39 @@
package com.java3y.austin.handler.flowcontrol;
import com.google.common.util.concurrent.RateLimiter;
import com.java3y.austin.handler.enums.RateLimitStrategy;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author 3y
* @date 2022/4/18
* <p>
*
*/
@Builder
@Data
@AllArgsConstructor
@NoArgsConstructor
public class FlowControlParam {
/**
*
*
*/
protected RateLimiter rateLimiter;
/**
*
*
*/
protected Double rateInitValue;
/**
*
*
*/
protected RateLimitStrategy rateLimitStrategy;
}

@ -0,0 +1,20 @@
package com.java3y.austin.handler.flowcontrol;
import com.java3y.austin.common.domain.TaskInfo;
/**
* @author 3y
*
*/
public interface FlowControlService {
/**
*
*
* @param taskInfo
* @param flowControlParam
*/
void flowControl(TaskInfo taskInfo, FlowControlParam flowControlParam);
}

@ -0,0 +1,77 @@
package com.java3y.austin.handler.flowcontrol.impl;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ctrip.framework.apollo.Config;
import com.ctrip.framework.apollo.spring.annotation.ApolloConfig;
import com.google.common.util.concurrent.RateLimiter;
import com.java3y.austin.common.constant.AustinConstant;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.common.enums.ChannelType;
import com.java3y.austin.handler.enums.RateLimitStrategy;
import com.java3y.austin.handler.flowcontrol.FlowControlParam;
import com.java3y.austin.handler.flowcontrol.FlowControlService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
/**
* @author 3y
* @date 2022/4/18
*/
@Service
@Slf4j
public class FlowControlServiceImpl implements FlowControlService {
private static final String FLOW_CONTROL_KEY = "flowControl";
private static final String FLOW_CONTROL_PREFIX = "flow_control_";
@ApolloConfig("boss.austin")
private Config config;
@Override
public void flowControl(TaskInfo taskInfo, FlowControlParam flowControlParam) {
RateLimiter rateLimiter = flowControlParam.getRateLimiter();
Double rateInitValue = flowControlParam.getRateInitValue();
double costTime = 0;
// 对比 初始限流值 与 配置限流值,以 配置中心的限流值为准
Double rateLimitConfig = getRateLimitConfig(taskInfo.getSendChannel());
if (rateLimitConfig != null && !rateInitValue.equals(rateLimitConfig)) {
rateLimiter = RateLimiter.create(rateLimitConfig);
flowControlParam.setRateInitValue(rateLimitConfig);
flowControlParam.setRateLimiter(rateLimiter);
}
if (RateLimitStrategy.REQUEST_RATE_LIMIT.equals(flowControlParam.getRateLimitStrategy())) {
costTime = rateLimiter.acquire(1);
}
if (RateLimitStrategy.SEND_USER_NUM_RATE_LIMIT.equals(flowControlParam.getRateLimitStrategy())) {
costTime = rateLimiter.acquire(taskInfo.getReceiver().size());
}
if (costTime > 0) {
log.info("consumer {} flow control time {}",
ChannelType.getEnumByCode(taskInfo.getSendChannel()).getDescription(), costTime);
}
}
/**
*
* <p>
* apollo keyflowControl value{"flow_control_40":1}
* <p>
* com.java3y.austin.common.enums.ChannelType
*
* @param channelCode
*/
private Double getRateLimitConfig(Integer channelCode) {
String flowControlConfig = config.getProperty(FLOW_CONTROL_KEY, AustinConstant.APOLLO_DEFAULT_VALUE_JSON_OBJECT);
JSONObject jsonObject = JSON.parseObject(flowControlConfig);
if (jsonObject.getDouble(FLOW_CONTROL_PREFIX + channelCode) == null) {
return null;
}
return jsonObject.getDouble(FLOW_CONTROL_PREFIX + channelCode);
}
}

@ -1,8 +1,12 @@
package com.java3y.austin.handler.handler;
import com.google.common.util.concurrent.RateLimiter;
import com.java3y.austin.common.domain.AnchorInfo;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.common.enums.AnchorState;
import com.java3y.austin.handler.enums.RateLimitStrategy;
import com.java3y.austin.handler.flowcontrol.FlowControlParam;
import com.java3y.austin.handler.flowcontrol.FlowControlService;
import com.java3y.austin.support.utils.LogUtils;
import org.springframework.beans.factory.annotation.Autowired;
@ -13,6 +17,12 @@ import javax.annotation.PostConstruct;
* handler
*/
public abstract class BaseHandler implements Handler {
@Autowired
private HandlerHolder handlerHolder;
@Autowired
private LogUtils logUtils;
@Autowired
private FlowControlService flowControlService;
/**
* Code
@ -20,12 +30,11 @@ public abstract class BaseHandler implements Handler {
*/
protected Integer channelCode;
@Autowired
private HandlerHolder handlerHolder;
@Autowired
private LogUtils logUtils;
/**
*
*
*/
protected FlowControlParam flowControlParam;
/**
* Handler
@ -35,8 +44,20 @@ public abstract class BaseHandler implements Handler {
handlerHolder.putHandler(channelCode, this);
}
/**
*
*
* @param taskInfo
*/
public void flowControl(TaskInfo taskInfo) {
// 只有子类指定了限流参数,才需要限流
if (flowControlParam != null) {
flowControlService.flowControl(taskInfo, flowControlParam);
}
}
@Override
public void doHandler(TaskInfo taskInfo) {
flowControl(taskInfo);
if (handler(taskInfo)) {
logUtils.print(AnchorInfo.builder().state(AnchorState.SEND_SUCCESS.getCode()).businessId(taskInfo.getBusinessId()).ids(taskInfo.getReceiver()).build());
return;
@ -44,6 +65,9 @@ public abstract class BaseHandler implements Handler {
logUtils.print(AnchorInfo.builder().state(AnchorState.SEND_FAIL.getCode()).businessId(taskInfo.getBusinessId()).ids(taskInfo.getReceiver()).build());
}
/**
* handler
*
@ -52,4 +76,6 @@ public abstract class BaseHandler implements Handler {
*/
public abstract boolean handler(TaskInfo taskInfo);
}

@ -45,7 +45,7 @@ public class DingDingRobotHandler extends BaseHandler implements Handler {
@Override
public boolean handler(TaskInfo taskInfo) {
try {
DingDingRobotAccount account = accountUtils.getAccount(taskInfo.getSendAccount(), SendAccountConstant.DING_DING_ROBOT_ACCOUNT_KEY, SendAccountConstant.DING_DING_ROBOT_PREFIX, new DingDingRobotAccount());
DingDingRobotAccount account = accountUtils.getAccount(taskInfo.getSendAccount(), SendAccountConstant.DING_DING_ROBOT_ACCOUNT_KEY, SendAccountConstant.DING_DING_ROBOT_PREFIX, DingDingRobotAccount.class);
DingDingRobotParam dingDingRobotParam = assembleParam(taskInfo);
String httpResult = HttpUtil.post(assembleParamUrl(account), JSON.toJSONString(dingDingRobotParam));
DingDingRobotResult dingDingRobotResult = JSON.parseObject(httpResult, DingDingRobotResult.class);

@ -50,7 +50,7 @@ public class DingDingWorkNoticeHandler extends BaseHandler implements Handler {
@Override
public boolean handler(TaskInfo taskInfo) {
try {
DingDingWorkNoticeAccount account = accountUtils.getAccount(taskInfo.getSendAccount(), SendAccountConstant.DING_DING_WORK_NOTICE_ACCOUNT_KEY, SendAccountConstant.DING_DING_WORK_NOTICE_PREFIX, new DingDingWorkNoticeAccount());
DingDingWorkNoticeAccount account = accountUtils.getAccount(taskInfo.getSendAccount(), SendAccountConstant.DING_DING_WORK_NOTICE_ACCOUNT_KEY, SendAccountConstant.DING_DING_WORK_NOTICE_PREFIX, DingDingWorkNoticeAccount.class);
OapiMessageCorpconversationAsyncsendV2Request request = assembleParam(account, taskInfo);
String accessToken = redisTemplate.opsForValue().get(SendAccountConstant.DING_DING_ACCESS_TOKEN_PREFIX + taskInfo.getSendAccount());
OapiMessageCorpconversationAsyncsendV2Response response = new DefaultDingTalkClient(URL).execute(request, accessToken);

@ -4,10 +4,13 @@ package com.java3y.austin.handler.handler.impl;
import cn.hutool.extra.mail.MailAccount;
import cn.hutool.extra.mail.MailUtil;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.RateLimiter;
import com.java3y.austin.common.constant.SendAccountConstant;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.common.dto.model.EmailContentModel;
import com.java3y.austin.common.enums.ChannelType;
import com.java3y.austin.handler.enums.RateLimitStrategy;
import com.java3y.austin.handler.flowcontrol.FlowControlParam;
import com.java3y.austin.handler.handler.BaseHandler;
import com.java3y.austin.handler.handler.Handler;
import com.java3y.austin.support.utils.AccountUtils;
@ -30,6 +33,13 @@ public class EmailHandler extends BaseHandler implements Handler {
public EmailHandler() {
channelCode = ChannelType.EMAIL.getCode();
// 按照请求限流,默认单机 3 qps 具体数值配置在apollo动态调整)
Double rateInitValue = Double.valueOf(3);
flowControlParam = FlowControlParam.builder().rateInitValue(rateInitValue)
.rateLimitStrategy(RateLimitStrategy.REQUEST_RATE_LIMIT)
.rateLimiter(RateLimiter.create(rateInitValue)).build();
}
@Override
@ -52,7 +62,7 @@ public class EmailHandler extends BaseHandler implements Handler {
* @return
*/
private MailAccount getAccountConfig(Integer sendAccount) {
MailAccount account = accountUtils.getAccount(sendAccount, SendAccountConstant.EMAIL_ACCOUNT_KEY, SendAccountConstant.EMAIL_ACCOUNT_PREFIX, new MailAccount());
MailAccount account = accountUtils.getAccount(sendAccount, SendAccountConstant.EMAIL_ACCOUNT_KEY, SendAccountConstant.EMAIL_ACCOUNT_PREFIX, MailAccount.class);
try {
MailSSLSocketFactory sf = new MailSSLSocketFactory();
sf.setTrustAllHosts(true);

@ -46,7 +46,7 @@ public class EnterpriseWeChatHandler extends BaseHandler implements Handler {
@Override
public boolean handler(TaskInfo taskInfo) {
try {
WxCpDefaultConfigImpl accountConfig = accountUtils.getAccount(taskInfo.getSendAccount(), SendAccountConstant.ENTERPRISE_WECHAT_ACCOUNT_KEY, SendAccountConstant.ENTERPRISE_WECHAT_PREFIX, new WxCpDefaultConfigImpl());
WxCpDefaultConfigImpl accountConfig = accountUtils.getAccount(taskInfo.getSendAccount(), SendAccountConstant.ENTERPRISE_WECHAT_ACCOUNT_KEY, SendAccountConstant.ENTERPRISE_WECHAT_PREFIX, WxCpDefaultConfigImpl.class);
WxCpMessageServiceImpl messageService = new WxCpMessageServiceImpl(initService(accountConfig));
WxCpMessageSendResult result = messageService.send(buildWxCpMessage(taskInfo, accountConfig.getAgentId()));
if (Integer.valueOf(WxMpErrorMsgEnum.CODE_0.getCode()).equals(result.getErrCode())) {

@ -0,0 +1,67 @@
package com.java3y.austin.handler.handler.impl;
import com.alibaba.fastjson.JSON;
import com.google.common.base.Throwables;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.common.dto.model.MiniProgramContentModel;
import com.java3y.austin.common.dto.model.OfficialAccountsContentModel;
import com.java3y.austin.common.enums.ChannelType;
import com.java3y.austin.handler.domain.wechat.WeChatMiniProgramParam;
import com.java3y.austin.handler.domain.wechat.WeChatOfficialParam;
import com.java3y.austin.handler.handler.BaseHandler;
import com.java3y.austin.handler.handler.Handler;
import com.java3y.austin.handler.script.MiniProgramAccountService;
import com.java3y.austin.handler.script.OfficialAccountService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author sunql
*
*/
@Component
@Slf4j
public class MiniProgramAccountHandler extends BaseHandler implements Handler {
@Autowired
private MiniProgramAccountService miniProgramAccountService;
public MiniProgramAccountHandler() {
channelCode = ChannelType.MINI_PROGRAM.getCode();
}
@Override
public boolean handler(TaskInfo taskInfo) {
WeChatMiniProgramParam miniProgramParam = buildMiniProgramParam(taskInfo);
try {
miniProgramAccountService.send(miniProgramParam);
} catch (Exception e) {
log.error("MiniProgramAccountHandler#handler fail:{},params:{}",
Throwables.getStackTraceAsString(e), JSON.toJSONString(taskInfo));
return false;
}
return true;
}
/**
* taskInfo
*
* @param taskInfo
* @return
*/
private WeChatMiniProgramParam buildMiniProgramParam(TaskInfo taskInfo) {
// 小程序订阅消息可以关联到系统业务,通过接口查询。
WeChatMiniProgramParam miniProgramParam = WeChatMiniProgramParam.builder()
.openIds(taskInfo.getReceiver())
.messageTemplateId(taskInfo.getMessageTemplateId())
.sendAccount(taskInfo.getSendAccount())
.build();
MiniProgramContentModel contentModel = (MiniProgramContentModel) taskInfo.getContentModel();
miniProgramParam.setData(contentModel.getMap());
return miniProgramParam;
}
}

@ -5,19 +5,15 @@ import com.google.common.base.Throwables;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.common.dto.model.OfficialAccountsContentModel;
import com.java3y.austin.common.enums.ChannelType;
import com.java3y.austin.handler.domain.wechat.WeChatOfficialParam;
import com.java3y.austin.handler.handler.BaseHandler;
import com.java3y.austin.handler.handler.Handler;
import com.java3y.austin.handler.script.OfficialAccountService;
import lombok.extern.slf4j.Slf4j;
import me.chanjar.weixin.mp.bean.template.WxMpTemplateData;
import me.chanjar.weixin.mp.bean.template.WxMpTemplateMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* @author zyg
@ -30,18 +26,24 @@ public class OfficialAccountHandler extends BaseHandler implements Handler {
@Autowired
private OfficialAccountService officialAccountService;
public OfficialAccountHandler() {
channelCode = ChannelType.OFFICIAL_ACCOUNT.getCode();
}
@Override
public boolean handler(TaskInfo taskInfo) {
// 构建微信模板消息
OfficialAccountsContentModel contentModel = (OfficialAccountsContentModel) taskInfo.getContentModel();
WeChatOfficialParam officialParam = WeChatOfficialParam.builder()
.openIds(taskInfo.getReceiver())
.messageTemplateId(taskInfo.getMessageTemplateId())
.sendAccount(taskInfo.getSendAccount())
.data(contentModel.getMap())
.build();
List<WxMpTemplateMessage> mpTemplateMessages = buildTemplateMsg(taskInfo);
// 微信模板消息需要记录响应结果
try {
List<String> messageIds = officialAccountService.send(mpTemplateMessages);
List<String> messageIds = officialAccountService.send(officialParam);
log.info("OfficialAccountHandler#handler successfully messageIds:{}", messageIds);
return true;
} catch (Exception e) {
@ -51,45 +53,5 @@ public class OfficialAccountHandler extends BaseHandler implements Handler {
return false;
}
/**
* taskInfo
*
* @param taskInfo
* @return
*/
private List<WxMpTemplateMessage> buildTemplateMsg(TaskInfo taskInfo) {
// 需是关注公众号的用户的OpenId
Set<String> receiver = taskInfo.getReceiver();
Long messageTemplateId = taskInfo.getMessageTemplateId();
// 微信模板消息可以关联到系统业务,通过接口查询。
String templateId = getRealWxMpTemplateId(messageTemplateId);
List<WxMpTemplateMessage> wxMpTemplateMessages = new ArrayList<>(receiver.size());
OfficialAccountsContentModel contentModel = (OfficialAccountsContentModel) taskInfo.getContentModel();
String url = contentModel.getUrl();
Map<String, String> param = contentModel.getMap();
// 构建微信模板消息
for (String openId : receiver) {
WxMpTemplateMessage templateMessage = WxMpTemplateMessage.builder()
.toUser(openId)
.templateId(templateId)
.url(url)
.build();
// WxMpTemplateData 对应模板消息 键 -- 值 -- color
param.forEach((k, v) -> templateMessage.addData(new WxMpTemplateData(k, v)));
wxMpTemplateMessages.add(templateMessage);
}
return wxMpTemplateMessages;
}
/**
* idid
*
* @param messageTemplateId id
* @return
*/
private String getRealWxMpTemplateId(Long messageTemplateId) {
return String.valueOf(messageTemplateId);
}
}

@ -0,0 +1,164 @@
package com.java3y.austin.handler.handler.impl;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.http.ContentType;
import cn.hutool.http.Header;
import cn.hutool.http.HttpRequest;
import com.alibaba.fastjson.JSON;
import com.google.common.base.Throwables;
import com.java3y.austin.common.constant.SendAccountConstant;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.common.dto.account.GeTuiAccount;
import com.java3y.austin.common.dto.model.PushContentModel;
import com.java3y.austin.common.enums.ChannelType;
import com.java3y.austin.handler.domain.push.PushParam;
import com.java3y.austin.handler.domain.push.getui.BatchSendPushParam;
import com.java3y.austin.handler.domain.push.getui.SendPushParam;
import com.java3y.austin.handler.domain.push.getui.SendPushResult;
import com.java3y.austin.handler.handler.BaseHandler;
import com.java3y.austin.handler.handler.Handler;
import com.java3y.austin.support.utils.AccountUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.util.Set;
/**
*
* <p>
* ()
*
* @author 3y
*/
@Component
@Slf4j
public class PushHandler extends BaseHandler implements Handler {
private static final String BASE_URL = "https://restapi.getui.com/v2/";
private static final String SINGLE_PUSH_PATH = "/push/single/cid";
private static final String BATCH_PUSH_CREATE_TASK_PATH = "/push/list/message";
private static final String BATCH_PUSH_PATH = "/push/list/cid";
public PushHandler() {
channelCode = ChannelType.PUSH.getCode();
}
@Autowired
private AccountUtils accountUtils;
@Autowired
private StringRedisTemplate redisTemplate;
@Override
public boolean handler(TaskInfo taskInfo) {
try {
GeTuiAccount account = accountUtils.getAccount(taskInfo.getSendAccount(), SendAccountConstant.GE_TUI_ACCOUNT_KEY, SendAccountConstant.GE_TUI_ACCOUNT_PREFIX, GeTuiAccount.class);
String token = redisTemplate.opsForValue().get(SendAccountConstant.GE_TUI_ACCESS_TOKEN_PREFIX + taskInfo.getSendAccount());
PushParam pushParam = PushParam.builder().token(token).appId(account.getAppId()).taskInfo(taskInfo).build();
String result;
if (taskInfo.getReceiver().size() == 1) {
result = singlePush(pushParam);
} else {
result = batchPush(createTaskId(pushParam), pushParam);
}
SendPushResult sendPushResult = JSON.parseObject(result, SendPushResult.class);
if (sendPushResult.getCode().equals(0)) {
return true;
}
// 常见的错误 应当 关联至 AnchorState,由austin后台统一透出失败原因
log.error("PushHandler#handler fail!result:{},params:{}", JSON.toJSONString(sendPushResult), JSON.toJSONString(taskInfo));
} catch (Exception e) {
log.error("PushHandler#handler fail!e:{},params:{}", Throwables.getStackTraceAsString(e), JSON.toJSONString(taskInfo));
}
return false;
}
/**
*
* @param pushParam
* @return http result
*/
private String singlePush(PushParam pushParam) {
String url = BASE_URL + pushParam.getAppId() + SINGLE_PUSH_PATH;
SendPushParam sendPushParam = assembleParam((PushContentModel) pushParam.getTaskInfo().getContentModel(), pushParam.getTaskInfo().getReceiver());
String body = HttpRequest.post(url).header(Header.CONTENT_TYPE.getValue(), ContentType.JSON.getValue())
.header("token", pushParam.getToken())
.body(JSON.toJSONString(sendPushParam))
.timeout(2000)
.execute().body();
return body;
}
/**
*
*
* @param taskId Id
* @param pushParam
* @return
*/
private String batchPush(String taskId, PushParam pushParam) {
String url = BASE_URL + pushParam.getAppId() + BATCH_PUSH_PATH;
BatchSendPushParam batchSendPushParam = BatchSendPushParam.builder()
.taskId(taskId)
.isAsync(true)
.audience(BatchSendPushParam.AudienceVO.builder().cid(pushParam.getTaskInfo().getReceiver()).build()).build();
String body = HttpRequest.post(url).header(Header.CONTENT_TYPE.getValue(), ContentType.JSON.getValue())
.header("token", pushParam.getToken())
.body(JSON.toJSONString(batchSendPushParam))
.timeout(2000)
.execute().body();
return body;
}
/**
* taskId
* @param pushParam
* @return http result
*/
private String createTaskId(PushParam pushParam) {
String url = BASE_URL + pushParam.getAppId() + BATCH_PUSH_CREATE_TASK_PATH;
SendPushParam param = assembleParam((PushContentModel) pushParam.getTaskInfo().getContentModel());
String taskId = "";
try {
String body = HttpRequest.post(url).header(Header.CONTENT_TYPE.getValue(), ContentType.JSON.getValue())
.header("token", pushParam.getToken())
.body(JSON.toJSONString(param))
.timeout(2000)
.execute().body();
taskId = JSON.parseObject(body, SendPushResult.class).getData().getString("taskId");
} catch (Exception e) {
log.error("PushHandler#createTaskId fail :{},params:{}", Throwables.getStackTraceAsString(e), JSON.toJSONString(pushParam.getTaskInfo()));
}
return taskId;
}
private SendPushParam assembleParam(PushContentModel pushContentModel) {
return assembleParam(pushContentModel, null);
}
private SendPushParam assembleParam(PushContentModel pushContentModel, Set<String> cid) {
SendPushParam param = SendPushParam.builder()
.requestId(String.valueOf(IdUtil.getSnowflake().nextId()))
.pushMessage(SendPushParam.PushMessageVO.builder().notification(SendPushParam.PushMessageVO.NotificationVO.builder()
.title(pushContentModel.getTitle()).body(pushContentModel.getContent()).clickType("startapp").build())
.build())
.build();
if (CollUtil.isNotEmpty(cid)) {
param.setAudience(SendPushParam.AudienceVO.builder().cid(cid).build());
}
return param;
}
}

@ -18,8 +18,9 @@ import org.springframework.stereotype.Component;
/**
* Task
* 0.
* 1.
* 2.
* 2.
* 2.
* 3.
*
* @author 3y
*/

@ -0,0 +1,19 @@
package com.java3y.austin.handler.script;
import com.java3y.austin.handler.domain.wechat.WeChatMiniProgramParam;
/**
* @author sunql
*/
public interface MiniProgramAccountService {
/**
*
*
* @param miniProgramParam
* @return
* @throws Exception
*/
void send(WeChatMiniProgramParam miniProgramParam) throws Exception;
}

@ -1,5 +1,6 @@
package com.java3y.austin.handler.script;
import com.java3y.austin.handler.domain.wechat.WeChatOfficialParam;
import me.chanjar.weixin.mp.bean.template.WxMpTemplateMessage;
import java.util.List;
@ -12,10 +13,10 @@ public interface OfficialAccountService {
/**
*
*
* @param wxMpTemplateMessages
* @param weChatOfficialParam
* @return
* @throws Exception
*/
List<String> send(List<WxMpTemplateMessage> wxMpTemplateMessages) throws Exception;
List<String> send(WeChatOfficialParam weChatOfficialParam) throws Exception;
}

@ -0,0 +1,93 @@
package com.java3y.austin.handler.script.impl;
import cn.binarywang.wx.miniapp.api.WxMaService;
import cn.binarywang.wx.miniapp.api.WxMaSubscribeService;
import cn.binarywang.wx.miniapp.api.impl.WxMaServiceImpl;
import cn.binarywang.wx.miniapp.api.impl.WxMaSubscribeServiceImpl;
import cn.binarywang.wx.miniapp.bean.WxMaSubscribeMessage;
import cn.binarywang.wx.miniapp.config.impl.WxMaDefaultConfigImpl;
import com.java3y.austin.common.constant.SendAccountConstant;
import com.java3y.austin.common.dto.account.WeChatMiniProgramAccount;
import com.java3y.austin.handler.domain.wechat.WeChatMiniProgramParam;
import com.java3y.austin.handler.script.MiniProgramAccountService;
import com.java3y.austin.support.utils.AccountUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* @author sunql
* @date 20220506 16:41
*/
@Service
@Slf4j
public class MiniProgramAccountServiceImpl implements MiniProgramAccountService {
@Autowired
private AccountUtils accountUtils;
@Override
public void send(WeChatMiniProgramParam miniProgramParam) throws Exception {
WeChatMiniProgramAccount miniProgramAccount = accountUtils.getAccount(miniProgramParam.getSendAccount(),
SendAccountConstant.WECHAT_MINI_PROGRAM_ACCOUNT_KEY,
SendAccountConstant.WECHAT_MINI_PROGRAM_PREFIX,
WeChatMiniProgramAccount.class);
WxMaSubscribeService wxMaSubscribeService = initService(miniProgramAccount);
List<WxMaSubscribeMessage> subscribeMessageList = assembleReq(miniProgramParam, miniProgramAccount);
for (WxMaSubscribeMessage subscribeMessage : subscribeMessageList) {
wxMaSubscribeService.sendSubscribeMsg(subscribeMessage);
}
}
/**
*
*/
private List<WxMaSubscribeMessage> assembleReq(WeChatMiniProgramParam miniProgramParam, WeChatMiniProgramAccount miniProgramAccount) {
Set<String> receiver = miniProgramParam.getOpenIds();
List<WxMaSubscribeMessage> messageList = new ArrayList<>(receiver.size());
// 构建微信小程序订阅消息
for (String openId : receiver) {
WxMaSubscribeMessage subscribeMessage = WxMaSubscribeMessage.builder()
.toUser(openId)
.data(getWxMTemplateData(miniProgramParam.getData()))
.miniprogramState(miniProgramAccount.getMiniProgramState())
.templateId(miniProgramAccount.getTemplateId())
.page(miniProgramAccount.getPage())
.build();
messageList.add(subscribeMessage);
}
return messageList;
}
/**
*
*
* @returnp
*/
private List<WxMaSubscribeMessage.MsgData> getWxMTemplateData(Map<String, String> data) {
List<WxMaSubscribeMessage.MsgData> templateDataList = new ArrayList<>(data.size());
data.forEach((k, v) -> templateDataList.add(new WxMaSubscribeMessage.MsgData(k, v)));
return templateDataList;
}
/**
*
*
* @return
*/
private WxMaSubscribeServiceImpl initService(WeChatMiniProgramAccount miniProgramAccount) {
WxMaService wxMaService = new WxMaServiceImpl();
WxMaDefaultConfigImpl wxMaConfig = new WxMaDefaultConfigImpl();
wxMaConfig.setAppid(miniProgramAccount.getAppId());
wxMaConfig.setSecret(miniProgramAccount.getAppSecret());
wxMaService.setWxMaConfig(wxMaConfig);
return new WxMaSubscribeServiceImpl(wxMaService);
}
}

@ -1,16 +1,23 @@
package com.java3y.austin.handler.script.impl;
import com.java3y.austin.common.constant.SendAccountConstant;
import com.java3y.austin.common.dto.account.WeChatOfficialAccount;
import com.java3y.austin.handler.domain.wechat.WeChatOfficialParam;
import com.java3y.austin.handler.script.OfficialAccountService;
import com.java3y.austin.support.utils.AccountUtils;
import lombok.extern.slf4j.Slf4j;
import me.chanjar.weixin.mp.api.WxMpService;
import me.chanjar.weixin.mp.api.impl.WxMpServiceImpl;
import me.chanjar.weixin.mp.bean.template.WxMpTemplateData;
import me.chanjar.weixin.mp.bean.template.WxMpTemplateMessage;
import me.chanjar.weixin.mp.config.impl.WxMpDefaultConfigImpl;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* @author zyg
@ -19,19 +26,14 @@ import java.util.List;
@Slf4j
public class OfficialAccountServiceImpl implements OfficialAccountService {
@Value("${wx.mp.account.appid}")
private String appId;
@Value("${wx.mp.account.secret}")
private String secret;
@Value("${wx.mp.account.token}")
private String token;
@Value("${wx.mp.account.aesKey}")
private String aesKey;
@Autowired
private AccountUtils accountUtils;
@Override
public List<String> send(List<WxMpTemplateMessage> messages) throws Exception {
WxMpService wxMpService = initService();
public List<String> send(WeChatOfficialParam officialParam) throws Exception {
WeChatOfficialAccount officialAccount = accountUtils.getAccount(officialParam.getSendAccount(), SendAccountConstant.WECHAT_OFFICIAL_ACCOUNT_KEY, SendAccountConstant.WECHAT_OFFICIAL__PREFIX, WeChatOfficialAccount.class);
WxMpService wxMpService = initService(officialAccount);
List<WxMpTemplateMessage> messages = assembleReq(officialParam, officialAccount);
List<String> messageIds = new ArrayList<>(messages.size());
for (WxMpTemplateMessage wxMpTemplateMessage : messages) {
String msgId = wxMpService.getTemplateMsgService().sendTemplateMsg(wxMpTemplateMessage);
@ -40,18 +42,48 @@ public class OfficialAccountServiceImpl implements OfficialAccountService {
return messageIds;
}
/**
*
*/
private List<WxMpTemplateMessage> assembleReq(WeChatOfficialParam officialParam, WeChatOfficialAccount officialAccount) {
Set<String> receiver = officialParam.getOpenIds();
List<WxMpTemplateMessage> wxMpTemplateMessages = new ArrayList<>(receiver.size());
// 构建微信模板消息
for (String openId : receiver) {
WxMpTemplateMessage templateMessage = WxMpTemplateMessage.builder()
.toUser(openId)
.templateId(officialAccount.getTemplateId())
.url(officialAccount.getUrl())
.data(getWxMpTemplateData(officialParam.getData()))
.miniProgram(new WxMpTemplateMessage.MiniProgram(officialAccount.getMiniProgramId(), officialAccount.getPath(), false))
.build();
wxMpTemplateMessages.add(templateMessage);
}
return wxMpTemplateMessages;
}
/**
*
*
* @return
*/
private List<WxMpTemplateData> getWxMpTemplateData(Map<String, String> data) {
List<WxMpTemplateData> templateDataList = new ArrayList<>(data.size());
data.forEach((k, v) -> templateDataList.add(new WxMpTemplateData(k, v)));
return templateDataList;
}
/**
*
*
* @return
*/
public WxMpService initService() {
public WxMpService initService(WeChatOfficialAccount officialAccount) {
WxMpService wxMpService = new WxMpServiceImpl();
WxMpDefaultConfigImpl config = new WxMpDefaultConfigImpl();
config.setAppId(appId);
config.setSecret(secret);
config.setToken(token);
config.setAesKey(aesKey);
config.setAppId(officialAccount.getAppId());
config.setSecret(officialAccount.getSecret());
wxMpService.setWxMpConfigStorage(config);
return wxMpService;
}

@ -44,7 +44,7 @@ public class TencentSmsScript implements SmsScript {
@Override
public List<SmsRecord> send(SmsParam smsParam) throws Exception {
TencentSmsAccount tencentSmsAccount = accountUtils.getAccount(smsParam.getSendAccount(), SendAccountConstant.SMS_ACCOUNT_KEY, SendAccountConstant.SMS_PREFIX, TencentSmsAccount.builder().build());
TencentSmsAccount tencentSmsAccount = accountUtils.getAccount(smsParam.getSendAccount(), SendAccountConstant.SMS_ACCOUNT_KEY, SendAccountConstant.SMS_PREFIX, TencentSmsAccount.class);
SmsClient client = init(tencentSmsAccount);
SendSmsRequest request = assembleReq(smsParam, tencentSmsAccount);
SendSmsResponse response = client.SendSms(request);

@ -11,10 +11,10 @@ import com.java3y.austin.handler.shield.ShieldService;
import com.java3y.austin.support.utils.LogUtils;
import com.java3y.austin.support.utils.RedisUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.util.Date;
import java.util.HashSet;
@ -60,7 +60,7 @@ public class ShieldServiceImpl implements ShieldService {
* @return
*/
private boolean isNight() {
return Integer.valueOf(DateFormatUtils.format(new Date(), "HH")) < 8;
return LocalDateTime.now().getHour() < 8;
}

@ -13,6 +13,7 @@ import com.java3y.austin.service.api.impl.domain.SendTaskModel;
import com.java3y.austin.support.pipeline.BusinessProcess;
import com.java3y.austin.support.pipeline.ProcessContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.Iterator;
import java.util.List;
@ -25,6 +26,7 @@ import java.util.stream.Collectors;
*
*/
@Slf4j
@Service
public class AfterParamCheckAction implements BusinessProcess<SendTaskModel> {

@ -21,6 +21,7 @@ import com.java3y.austin.support.utils.ContentHolderUtil;
import com.java3y.austin.support.utils.TaskInfoUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.lang.reflect.Field;
import java.util.*;
@ -31,6 +32,7 @@ import java.util.*;
* @description
*/
@Slf4j
@Service
public class AssembleAction implements BusinessProcess<SendTaskModel> {
@Autowired
@ -112,7 +114,8 @@ public class AssembleAction implements BusinessProcess<SendTaskModel> {
if (StrUtil.isNotBlank(originValue)) {
String resultValue = ContentHolderUtil.replacePlaceHolder(originValue, variables);
ReflectUtil.setFieldValue(contentModel, field, resultValue);
Object resultObj = JSON.parseObject(resultValue, field.getType());
ReflectUtil.setFieldValue(contentModel, field, resultObj);
}
}

@ -9,6 +9,7 @@ import com.java3y.austin.service.api.impl.domain.SendTaskModel;
import com.java3y.austin.support.pipeline.BusinessProcess;
import com.java3y.austin.support.pipeline.ProcessContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.stream.Collectors;
@ -19,6 +20,7 @@ import java.util.stream.Collectors;
* @description
*/
@Slf4j
@Service
public class PreParamCheckAction implements BusinessProcess<SendTaskModel> {
@Override

@ -13,12 +13,14 @@ import com.java3y.austin.support.utils.KafkaUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
/**
* @author 3y
* MQ
*/
@Slf4j
@Service
public class SendMqAction implements BusinessProcess<SendTaskModel> {
@Autowired

@ -6,15 +6,15 @@ import com.java3y.austin.service.api.impl.action.AfterParamCheckAction;
import com.java3y.austin.service.api.impl.action.AssembleAction;
import com.java3y.austin.service.api.impl.action.PreParamCheckAction;
import com.java3y.austin.service.api.impl.action.SendMqAction;
import com.java3y.austin.service.api.impl.domain.SendTaskModel;
import com.java3y.austin.support.pipeline.BusinessProcess;
import com.java3y.austin.support.pipeline.ProcessController;
import com.java3y.austin.support.pipeline.ProcessTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.*;
/**
* apipipeline
@ -23,6 +23,14 @@ import java.util.Map;
@Configuration
public class PipelineConfig {
@Autowired
private PreParamCheckAction preParamCheckAction;
@Autowired
private AssembleAction assembleAction;
@Autowired
private AfterParamCheckAction afterParamCheckAction;
@Autowired
private SendMqAction sendMqAction;
/**
*
@ -35,14 +43,8 @@ public class PipelineConfig {
@Bean("commonSendTemplate")
public ProcessTemplate commonSendTemplate() {
ProcessTemplate processTemplate = new ProcessTemplate();
ArrayList<BusinessProcess> processList = new ArrayList<>();
processList.add(preParamCheckAction());
processList.add(assembleAction());
processList.add(afterParamCheckAction());
processList.add(sendMqAction());
processTemplate.setProcessList(processList);
processTemplate.setProcessList(Arrays.asList(preParamCheckAction, assembleAction,
afterParamCheckAction, sendMqAction));
return processTemplate;
}
@ -62,45 +64,4 @@ public class PipelineConfig {
return processController;
}
/**
* Action
*
* @return
*/
@Bean
public AssembleAction assembleAction() {
return new AssembleAction();
}
/**
* Action
*
* @return
*/
@Bean
public PreParamCheckAction preParamCheckAction() {
return new PreParamCheckAction();
}
/**
* Action
*
* @return
*/
@Bean
public AfterParamCheckAction afterParamCheckAction() {
return new AfterParamCheckAction();
}
/**
* MQAction
*
* @return
*/
@Bean
public SendMqAction sendMqAction() {
return new SendMqAction();
}
}

@ -0,0 +1,95 @@
package com.java3y.austin.service.api.impl.service;
import com.java3y.austin.common.enums.RespStatusEnum;
import com.java3y.austin.common.vo.BasicResultVO;
import com.java3y.austin.service.api.domain.BatchSendRequest;
import com.java3y.austin.service.api.domain.MessageParam;
import com.java3y.austin.service.api.domain.SendRequest;
import com.java3y.austin.service.api.domain.SendResponse;
import com.java3y.austin.service.api.enums.BusinessCode;
import com.java3y.austin.service.api.impl.domain.SendTaskModel;
import com.java3y.austin.support.pipeline.BusinessProcess;
import com.java3y.austin.support.pipeline.ProcessContext;
import com.java3y.austin.support.pipeline.ProcessController;
import com.java3y.austin.support.pipeline.ProcessTemplate;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.jupiter.MockitoExtension;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
class SendServiceImplTest {
@Spy
private ProcessController processController;
@Mock
private Map<String, ProcessTemplate> templateConfig;
@Spy
private ProcessTemplate processTemplate;
@Mock
private BusinessProcess businessProcess;
@InjectMocks
private SendServiceImpl sendServiceImplUnderTest;
@Test
void testSend() {
// params
final SendRequest sendRequest = new SendRequest("send", 1L,
new MessageParam("13711111111", new HashMap<>(), new HashMap<>()));
// predict result
final ProcessContext<SendTaskModel> processContext = new ProcessContext<>(sendRequest.getCode(), new SendTaskModel(), false, new BasicResultVO<>(
RespStatusEnum.SUCCESS, "data"));
final SendResponse expectedResult = new SendResponse(processContext.getResponse().getStatus(), processContext.getResponse().getMsg());
// stub
Map<String, ProcessTemplate> templateConfig = new HashMap<>(4);
processTemplate.setProcessList(Arrays.asList(businessProcess));
templateConfig.put(BusinessCode.COMMON_SEND.getCode(), processTemplate);
processController.setTemplateConfig(templateConfig);
// Run the test
final SendResponse result = sendServiceImplUnderTest.send(sendRequest);
// Verify the results
assertEquals(expectedResult, result);
}
@Test
void testBatchSend() {
// Setup
final BatchSendRequest batchSendRequest = new BatchSendRequest("code", 0L,
Arrays.asList(new MessageParam("receiver", new HashMap<>(), new HashMap<>())));
final SendResponse expectedResult = new SendResponse("status", "msg");
// Configure ProcessController.process(...).
final ProcessContext processContext = new ProcessContext<>("code", null, false, new BasicResultVO<>(
RespStatusEnum.SUCCESS, "data"));
when(processController.process(new ProcessContext<>("code", null, false, new BasicResultVO<>(
RespStatusEnum.SUCCESS, "data")))).thenReturn(processContext);
// Run the test
final SendResponse result = sendServiceImplUnderTest.batchSend(batchSendRequest);
// Verify the results
assertEquals(expectedResult, result);
}
}

@ -22,6 +22,11 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>

@ -24,16 +24,19 @@ public class AccountUtils {
* (key:smsAccount)[{"sms_10":{"url":"sms.tencentcloudapi.com","region":"ap-guangzhou","secretId":"AKIDhDUUDfffffMEqBF1WljQq","secretKey":"B4h39yWnfffff7D2btue7JErDJ8gxyi","smsSdkAppId":"140025","templateId":"11897","signName":"Java3y公众号","supplierId":10,"supplierName":"腾讯云"}}]
* (key:emailAccount)[{"email_10":{"host":"smtp.qq.com","port":465,"user":"403686131@qq.com","pass":"","from":"403686131@qq.com"}}]
* (key:enterpriseWechatAccount)[{"enterprise_wechat_10":{"corpId":"wwf87603333e00069c","corpSecret":"-IFWxS2222QxzPIorNVUQn144444D915DM","agentId":10044442,"token":"rXROB3333Kf6i","aesKey":"MKZtoFxHIM44444M7ieag3r9ZPUsl"}}]
* (key:dingDingRobotAccount) [{"ding_ding_robot_10":{"secret":"SEC996d8d9d4768aded74114faae924f229229de444475a1c295d64fedf","webhook":"https://oapi.dingtalk.com/robot/send?access_token=8d03b644ffb6534b203d87333367328b0c3003d164715d2c6c6e56"}}]
* (key:dingDingRobotAccount) [{"ding_ding_robot_10":{"secret":"SEC996d8d9d4768aded74114faae924f229229de444475a1c295d64fedf","webhook":"https://oapi.dingtalk.com/robot/send?access_token=8d03b644ffb6534b203d87333367328b0c3003d164715d2c6c6e56"}}]
* (key:dingDingWorkNoticeAccount) [{"ding_ding_work_notice_10":{"appKey":"dingh6yyyyyyycrlbx","appSecret":"tQpvmkR863333yyyyyHP3QHyyyymy9Ao1yoL1oQX5Nlx_fYLLLlpPJWHvWKbTu","agentId":"152333383622"}}]
* (key:officialAccount) [{"official_10":{"appId":"wxecb4693d2eef1ea7","secret":"6240870f4d91701640d769ba20120821","templateId":"JHUk6eE9T5Ts7a5JO3ZQqkBBrZBGn5C9iIiKNDQsk-Q","url":"http://weixin.qq.com/download","miniProgramId":"xiaochengxuappid12345","path":"index?foo=bar"}}]
* (key:miniProgramAccount) [{"mini_program_10":{"appId":"wxecb4693d2eef1ea7","appSecret":"6240870f4d91701640d769ba20120821","templateId":"JHUk6eE9T5Ts7a5JO3ZQqkBBrZBGn5C9iIiKNDQsk-Q","grantType":"client_credential","miniProgramState":"trial","page":"index?foo=bar"}}]
*/
public <T> T getAccount(Integer sendAccount, String apolloKey, String prefix, T t) {
public <T> T getAccount(Integer sendAccount, String apolloKey, String prefix, Class<T> clazz) {
String accountValues = config.getProperty(apolloKey, AustinConstant.APOLLO_DEFAULT_VALUE_JSON_ARRAY);
JSONArray jsonArray = JSON.parseArray(accountValues);
for (int i = 0; i < jsonArray.size(); i++) {
JSONObject jsonObject = jsonArray.getJSONObject(i);
Object object = jsonObject.getObject(prefix + sendAccount, t.getClass());
T object = jsonObject.getObject(prefix + sendAccount, clazz);
if (object != null) {
return (T) object;
return object;
}
}
return null;

@ -2,10 +2,12 @@ package com.java3y.austin.support.utils;
import cn.hutool.core.collection.CollUtil;
import com.google.common.base.Throwables;
import com.java3y.austin.common.constant.AustinConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.stereotype.Component;
import java.util.HashMap;
@ -93,7 +95,6 @@ public class RedisUtils {
/**
* lpush
*
*/
public void lPush(String key, String value, Long seconds) {
try {
@ -109,7 +110,6 @@ public class RedisUtils {
/**
* lLen
*
*/
public Long lLen(String key) {
try {
@ -119,9 +119,9 @@ public class RedisUtils {
}
return 0L;
}
/**
* lPop
*
*/
public String lPop(String key) {
try {
@ -151,4 +151,33 @@ public class RedisUtils {
log.error("redis pipelineSetEX fail! e:{}", Throwables.getStackTraceAsString(e));
}
}
/**
* lua
* --KEYS[1]: key
* --ARGV[1]:
* --ARGV[2]: score
* --ARGV[3]:
* --ARGV[4]: score value
*
* @param redisScript
* @param keys
* @param args
* @return
*/
public Boolean execLimitLua(RedisScript<Long> redisScript, List<String> keys, String... args) {
try {
Long execute = redisTemplate.execute(redisScript, keys, args);
return AustinConstant.TRUE.equals(execute.intValue());
} catch (Exception e) {
log.error("redis execLimitLua fail! e:{}", Throwables.getStackTraceAsString(e));
}
return false;
}
}

@ -0,0 +1,108 @@
package com.java3y.austin.support.utils;
/**
* id
*
* @author cao
* @date 2022-04-20 13:12
*/
public class SnowFlakeIdUtils {
/**
* (2017-01-01)
*/
private final static long START_TIMESTAMP = 1483200000000L;
/**
*
*/
private final static long SEQUENCE_BIT = 12; //***占用的位数
private final static long MACHINE_BIT = 5; //机器标识占用的位数
private final static long DATA_CENTER_BIT = 5; //数据中心占用的位数
/**
*
*/
private final static long MAX_SEQUENCE = -1L ^ (-1L << SEQUENCE_BIT);
private final static long MAX_MACHINE_NUM = -1L ^ (-1L << MACHINE_BIT);
private final static long MAX_DATA_CENTER_NUM = -1L ^ (-1L << DATA_CENTER_BIT);
/**
*
*/
private final static long MACHINE_LEFT = SEQUENCE_BIT;
private final static long DATA_CENTER_LEFT = SEQUENCE_BIT + MACHINE_BIT;
private final static long TIMESTAMP_LEFT = DATA_CENTER_LEFT + DATA_CENTER_BIT;
private long dataCenterId; //数据中心
private long machineId; //机器标识
private long sequence = 0L; //***
private long lastTimeStamp = -1L; //上一次时间戳
/**
* IDID***
*
* @param dataCenterId ID
* @param machineId ID
*/
public SnowFlakeIdUtils(long dataCenterId, long machineId) {
if (dataCenterId > MAX_DATA_CENTER_NUM || dataCenterId < 0) {
throw new IllegalArgumentException(String.format("DtaCenterId 不能大于 %d 或小于 0", MAX_DATA_CENTER_NUM));
}
if (machineId > MAX_MACHINE_NUM || machineId < 0) {
throw new IllegalArgumentException(String.format("MachineId 不能大于 %d 或小于 0", MAX_MACHINE_NUM));
}
this.dataCenterId = dataCenterId;
this.machineId = machineId;
}
/**
* ID
*
* @return
*/
public synchronized long nextId() {
long currTimeStamp = System.currentTimeMillis();
if (currTimeStamp < lastTimeStamp) {
throw new RuntimeException("当前时间小于上一次记录的时间戳!");
}
if (currTimeStamp == lastTimeStamp) {
//相同毫秒内,***自增
sequence = (sequence + 1) & MAX_SEQUENCE;
//同一毫秒的序列数已经达到最大
if (sequence == 0L) {
currTimeStamp = getNextMill();
}
} else {
//不同毫秒内,***置为0
sequence = 0L;
}
lastTimeStamp = currTimeStamp;
return (currTimeStamp - START_TIMESTAMP) << TIMESTAMP_LEFT //时间戳部分
| dataCenterId << DATA_CENTER_LEFT //数据中心部分
| machineId << MACHINE_LEFT //机器标识部分
| sequence; //***部分
}
/**
*
*
* @return
*/
private long getNextMill() {
long mill = System.currentTimeMillis();
while (mill <= lastTimeStamp) {
mill = System.currentTimeMillis();
}
return mill;
}
}

@ -87,10 +87,3 @@ management.endpoint.metrics.enabled=true
management.endpoint.prometheus.enabled=true
management.endpoints.web.exposure.include=*
management.metrics.export.prometheus.enabled=true
##################### wx mp config #####################
##################### TODO not test by 3y,wait to apply for OfficialAccount #####################
wx.mp.account.appid="appid"
wx.mp.account.secret="secret"
wx.mp.account.token="token"
wx.mp.account.aesKey="aesKey"

@ -0,0 +1,17 @@
--KEYS[1]: 限流 key
--ARGV[1]: 限流窗口,毫秒
--ARGV[2]: 当前时间戳作为score
--ARGV[3]: 阈值
--ARGV[4]: score 对应的唯一value
-- 1\. 移除开始时间窗口之前的数据
redis.call('zremrangeByScore', KEYS[1], 0, ARGV[2]-ARGV[1])
-- 2\. 统计当前元素数量
local res = redis.call('zcard', KEYS[1])
-- 3\. 是否超过阈值
if (res == nil) or (res < tonumber(ARGV[3])) then
redis.call('zadd', KEYS[1], ARGV[2], ARGV[4])
redis.call('expire', KEYS[1], ARGV[1]/1000)
return 0
else
return 1
end

@ -160,6 +160,13 @@
<version>${weixin-java}</version>
</dependency>
<!--微信小程序第三方SDK-->
<dependency>
<groupId>com.github.binarywang</groupId>
<artifactId>weixin-java-miniapp</artifactId>
<version>${weixin-java}</version>
</dependency>
<!--动态线程池引入-->
<dependency>
<groupId>io.github.lyh200</groupId>

Loading…
Cancel
Save