From 86bf561e773b3b826b387de4c44f5398c6b922d4 Mon Sep 17 00:00:00 2001 From: 3y Date: Tue, 15 Aug 2023 21:34:09 +0800 Subject: [PATCH] =?UTF-8?q?1.=20=E9=92=89=E9=92=89=E6=92=A4=E5=9B=9E?= =?UTF-8?q?=E6=94=AF=E6=8C=81messageId=E7=BB=B4=E5=BA=A6=202.=20=E6=89=93?= =?UTF-8?q?=E5=8D=B0austin=20banner=203.=20docker-compose.yml=20=E5=A2=9E?= =?UTF-8?q?=E5=8A=A0mysql=E9=85=8D=E7=BD=AE=204.=20graylog=20=E5=8F=AA?= =?UTF-8?q?=E5=9C=A8test=E7=8E=AF=E5=A2=83=E6=89=93=E5=8D=B0=205.=20?= =?UTF-8?q?=E9=83=A8=E5=88=86=E5=B8=B8=E9=87=8F=E6=8A=BD=E5=8F=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...nt.java => AccessTokenPrefixConstant.java} | 2 +- .../common/constant/AustinConstant.java | 26 ++++- .../constant/SendChanelUrlConstant.java | 46 +++++++++ .../common/constant/ThreadPoolConstant.java | 7 +- .../austin/common/domain/RecallTaskInfo.java | 39 ++++++++ .../austin/common/domain/SimpleTaskInfo.java | 2 +- .../config/CronAsyncThreadPoolConfig.java | 3 +- .../RefreshDingDingAccessTokenHandler.java | 4 +- .../RefreshGeTuiAccessTokenHandler.java | 5 +- .../AlipayMiniProgramAccountServiceImpl.java | 15 --- .../handler/config/AlipayClientSingleton.java | 3 +- .../austin/handler/handler/Handler.java | 6 +- .../impl/AlipayMiniProgramAccountHandler.java | 5 +- .../handler/impl/DingDingRobotHandler.java | 4 +- .../impl/DingDingWorkNoticeHandler.java | 94 ++++++++++++------- .../handler/handler/impl/EmailHandler.java | 5 +- .../handler/impl/EnterpriseWeChatHandler.java | 6 +- .../impl/EnterpriseWeChatRobotHandler.java | 5 +- .../handler/impl/FeiShuRobotHandler.java | 5 +- .../impl/MiniProgramAccountHandler.java | 5 +- .../handler/impl/OfficialAccountHandler.java | 5 +- .../handler/handler/impl/PushHandler.java | 23 ++--- .../handler/handler/impl/SmsHandler.java | 4 +- .../receiver/eventbus/EventBusReceiver.java | 6 +- .../handler/receiver/kafka/Receiver.java | 6 +- .../receiver/rabbit/RabbitMqReceiver.java | 6 +- .../rocketmq/RocketMqRecallReceiver.java | 6 +- .../receiver/service/ConsumeService.java | 8 +- .../service/impl/ConsumeServiceImpl.java | 8 +- .../SpringEventBusReceiver.java | 6 +- .../SpringEventBusReceiverListener.java | 4 +- .../handler/script/impl/YunPianSmsScript.java | 6 +- .../action/recall/RecallAssembleAction.java | 54 +++++++++++ .../impl/action/recall/RecallMqAction.java | 49 ++++++++++ .../SendAfterCheckAction.java} | 16 ++-- .../SendAssembleAction.java} | 13 +-- .../impl/action/{ => send}/SendMqAction.java | 28 +++--- .../SendPreCheckAction.java} | 12 +-- .../api/impl/config/PipelineConfig.java | 28 ++++-- .../api/impl/domain/RecallTaskModel.java | 37 ++++++++ .../api/impl/domain/SendTaskModel.java | 6 -- .../api/impl/service/RecallServiceImpl.java | 11 +-- .../api/impl/service/SendServiceImpl.java | 8 +- .../api/impl/service/TraceServiceImpl.java | 1 - .../service/api/domain/SendRequest.java | 9 ++ .../service/api/service/RecallService.java | 4 +- .../service/api/service/SendService.java | 4 +- .../service/api/service/TraceService.java | 2 + .../java3y/austin/stream/sink/AustinSink.java | 16 +--- .../config/SupportThreadPoolConfig.java | 3 +- .../support/mq/eventbus/EventBusListener.java | 6 +- .../eventbus/EventBusSendMqServiceImpl.java | 3 +- .../support/utils/AccessTokenUtils.java | 12 +-- .../austin/support/utils/AccountUtils.java | 8 +- .../austin/support/utils/AustinFileUtils.java | 2 +- .../austin/support/utils/TaskInfoUtils.java | 2 + .../com/java3y/austin/AustinApplication.java | 23 ++++- .../web/controller/MiniProgramController.java | 4 +- .../web/service/impl/DataServiceImpl.java | 2 +- .../web/service/impl/MaterialServiceImpl.java | 13 +-- austin-web/src/main/resources/logback.xml | 27 ++++-- doc/docker/mysql/mysql.cnf | 5 + docker-compose.yml | 2 + 63 files changed, 542 insertions(+), 243 deletions(-) rename austin-common/src/main/java/com/java3y/austin/common/constant/{SendAccountConstant.java => AccessTokenPrefixConstant.java} (93%) create mode 100644 austin-common/src/main/java/com/java3y/austin/common/constant/SendChanelUrlConstant.java create mode 100644 austin-common/src/main/java/com/java3y/austin/common/domain/RecallTaskInfo.java create mode 100644 austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/recall/RecallAssembleAction.java create mode 100644 austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/recall/RecallMqAction.java rename austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/{AfterParamCheckAction.java => send/SendAfterCheckAction.java} (86%) rename austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/{AssembleAction.java => send/SendAssembleAction.java} (90%) rename austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/{ => send}/SendMqAction.java (62%) rename austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/{PreParamCheckAction.java => send/SendPreCheckAction.java} (85%) create mode 100644 austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/domain/RecallTaskModel.java create mode 100644 doc/docker/mysql/mysql.cnf diff --git a/austin-common/src/main/java/com/java3y/austin/common/constant/SendAccountConstant.java b/austin-common/src/main/java/com/java3y/austin/common/constant/AccessTokenPrefixConstant.java similarity index 93% rename from austin-common/src/main/java/com/java3y/austin/common/constant/SendAccountConstant.java rename to austin-common/src/main/java/com/java3y/austin/common/constant/AccessTokenPrefixConstant.java index c881f00..61fe588 100644 --- a/austin-common/src/main/java/com/java3y/austin/common/constant/SendAccountConstant.java +++ b/austin-common/src/main/java/com/java3y/austin/common/constant/AccessTokenPrefixConstant.java @@ -6,7 +6,7 @@ package com.java3y.austin.common.constant; * * @author 3y */ -public class SendAccountConstant { +public class AccessTokenPrefixConstant { /** * 钉钉 工作应用消息 账号 diff --git a/austin-common/src/main/java/com/java3y/austin/common/constant/AustinConstant.java b/austin-common/src/main/java/com/java3y/austin/common/constant/AustinConstant.java index 4de52b3..9ba45b4 100644 --- a/austin-common/src/main/java/com/java3y/austin/common/constant/AustinConstant.java +++ b/austin-common/src/main/java/com/java3y/austin/common/constant/AustinConstant.java @@ -27,18 +27,38 @@ public class AustinConstant { */ public static final String SEND_ALL = "@all"; - public static final String CACHE_KEY_PREFIX = "Austin"; - + /** + * 链路追踪缓存的key标识 + */ + public static final String CACHE_KEY_PREFIX = "Austin"; public static final String MESSAGE_ID = "MessageId"; /** - * 默认的常量,如果新建模板/账号时,没传入则用该常量 + * 消息模板常量; + * 如果新建模板/账号时,没传入则用该常量 */ public static final String DEFAULT_CREATOR = "Java3y"; public static final String DEFAULT_UPDATOR = "Java3y"; public static final String DEFAULT_TEAM = "Java3y公众号"; public static final String DEFAULT_AUDITOR = "Java3y"; + /** + * 项目打印常量 + */ + public static final String PROJECT_NAME = " :: Austin :: "; + public static final String PROJECT_BANNER = "\n" + + " .----------------. .----------------. .----------------. .----------------. .----------------. .-----------------.\n" + + "| .--------------. || .--------------. || .--------------. || .--------------. || .--------------. || .--------------. |\n" + + "| | __ | || | _____ _____ | || | _______ | || | _________ | || | _____ | || | ____ _____ | |\n" + + "| | / \\ | || ||_ _||_ _|| || | / ___ | | || | | _ _ | | || | |_ _| | || ||_ \\|_ _| | |\n" + + "| | / /\\ \\ | || | | | | | | || | | (__ \\_| | || | |_/ | | \\_| | || | | | | || | | \\ | | | |\n" + + "| | / ____ \\ | || | | ' ' | | || | '.___`-. | || | | | | || | | | | || | | |\\ \\| | | |\n" + + "| | _/ / \\ \\_ | || | \\ `--' / | || | |`\\____) | | || | _| |_ | || | _| |_ | || | _| |_\\ |_ | |\n" + + "| ||____| |____|| || | `.__.' | || | |_______.' | || | |_____| | || | |_____| | || ||_____|\\____| | |\n" + + "| | | || | | || | | || | | || | | || | | |\n" + + "| '--------------' || '--------------' || '--------------' || '--------------' || '--------------' || '--------------' |\n" + + " '----------------' '----------------' '----------------' '----------------' '----------------' '----------------' \n"; + } diff --git a/austin-common/src/main/java/com/java3y/austin/common/constant/SendChanelUrlConstant.java b/austin-common/src/main/java/com/java3y/austin/common/constant/SendChanelUrlConstant.java new file mode 100644 index 0000000..8857d45 --- /dev/null +++ b/austin-common/src/main/java/com/java3y/austin/common/constant/SendChanelUrlConstant.java @@ -0,0 +1,46 @@ +package com.java3y.austin.common.constant; + + +/** + * 发送渠道的URL常量,统一维护发送地址 + * + * @author 3y + */ +public class SendChanelUrlConstant { + + /** + * 个推相关的url + */ + public static final String GE_TUI_BASE_URL = "https://restapi.getui.com/v2/"; + public static final String GE_TUI_SINGLE_PUSH_PATH = "/push/single/cid"; + public static final String GE_TUI_BATCH_PUSH_CREATE_TASK_PATH = "/push/list/message"; + public static final String GE_TUI_BATCH_PUSH_PATH = "/push/list/cid"; + public static final String GE_TUI_AUTH = "/auth"; + + + /** + * 钉钉工作消息相关的url + */ + public static final String DING_DING_SEND_URL = "https://oapi.dingtalk.com/topapi/message/corpconversation/asyncsend_v2"; + public static final String DING_DING_RECALL_URL = "https://oapi.dingtalk.com/topapi/message/corpconversation/recall"; + public static final String DING_DING_PULL_URL = "https://oapi.dingtalk.com/topapi/message/corpconversation/getsendresult"; + public static final String DING_DING_UPLOAD_URL = "https://oapi.dingtalk.com/media/upload"; + public static final String DING_DING_TOKEN_URL = "https://oapi.dingtalk.com/gettoken"; + + /** + * 企业微信机器人相关的url + */ + public static final String ENTERPRISE_WE_CHAT_ROBOT_URL = "https://qyapi.weixin.qq.com/cgi-bin/webhook/upload_media?key=&type="; + + /** + * 支付宝小程序相关的url + */ + public static final String ALI_MINI_PROGRAM_GATEWAY_URL = "https://openapi.alipaydev.com/gateway.do"; + + /** + * 微信小程序相关的url + */ + public static final String WE_CHAT_MINI_PROGRAM_OPENID_SYNC = "https://api.weixin.qq.com/sns/jscode2session?appid=&secret=&js_code=&grant_type=authorization_code"; + + +} diff --git a/austin-common/src/main/java/com/java3y/austin/common/constant/ThreadPoolConstant.java b/austin-common/src/main/java/com/java3y/austin/common/constant/ThreadPoolConstant.java index 73f40de..49b4eb6 100644 --- a/austin-common/src/main/java/com/java3y/austin/common/constant/ThreadPoolConstant.java +++ b/austin-common/src/main/java/com/java3y/austin/common/constant/ThreadPoolConstant.java @@ -1,8 +1,5 @@ package com.java3y.austin.common.constant; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; - /** * 线程池常见的常量信息 * (仅供初始化的使用,代码里的线程池配置有可能被apollo动态修改) @@ -29,9 +26,7 @@ public class ThreadPoolConstant { /** - * big + * big queue size */ public static final Integer BIG_QUEUE_SIZE = 1024; - public static final BlockingQueue BIG_BLOCKING_QUEUE = new LinkedBlockingQueue(BIG_QUEUE_SIZE); - } diff --git a/austin-common/src/main/java/com/java3y/austin/common/domain/RecallTaskInfo.java b/austin-common/src/main/java/com/java3y/austin/common/domain/RecallTaskInfo.java new file mode 100644 index 0000000..cbbb708 --- /dev/null +++ b/austin-common/src/main/java/com/java3y/austin/common/domain/RecallTaskInfo.java @@ -0,0 +1,39 @@ +package com.java3y.austin.common.domain; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.List; + +/** + * 撤回任务信息 + * + * @author 3y + */ +@Data +@Builder +@AllArgsConstructor +@NoArgsConstructor +public class RecallTaskInfo { + /** + * 消息模板Id + */ + private Long messageTemplateId; + + /** + * 需要撤回的消息ids + */ + private List recallMessageId; + + /** + * 发送账号 + */ + private Integer sendAccount; + + /** + * 发送渠道 + */ + private Integer sendChannel; +} diff --git a/austin-common/src/main/java/com/java3y/austin/common/domain/SimpleTaskInfo.java b/austin-common/src/main/java/com/java3y/austin/common/domain/SimpleTaskInfo.java index a82457e..e9eb7ee 100644 --- a/austin-common/src/main/java/com/java3y/austin/common/domain/SimpleTaskInfo.java +++ b/austin-common/src/main/java/com/java3y/austin/common/domain/SimpleTaskInfo.java @@ -8,7 +8,7 @@ import lombok.NoArgsConstructor; /** * @Author: sky * @Date: 2023/7/13 10:43 - * @Description: SimpleTaskInfo + * // * @Description: SimpleTaskInfo 调用发送接口成功后返回对应的信息,用于查看下发情况 * @Version 1.0.0 */ @Data diff --git a/austin-cron/src/main/java/com/java3y/austin/cron/config/CronAsyncThreadPoolConfig.java b/austin-cron/src/main/java/com/java3y/austin/cron/config/CronAsyncThreadPoolConfig.java index 19fea23..4749fd4 100644 --- a/austin-cron/src/main/java/com/java3y/austin/cron/config/CronAsyncThreadPoolConfig.java +++ b/austin-cron/src/main/java/com/java3y/austin/cron/config/CronAsyncThreadPoolConfig.java @@ -8,6 +8,7 @@ import com.dtp.core.thread.ThreadPoolBuilder; import com.java3y.austin.common.constant.ThreadPoolConstant; import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -34,7 +35,7 @@ public class CronAsyncThreadPoolConfig { return ExecutorBuilder.create() .setCorePoolSize(ThreadPoolConstant.COMMON_CORE_POOL_SIZE) .setMaxPoolSize(ThreadPoolConstant.COMMON_MAX_POOL_SIZE) - .setWorkQueue(ThreadPoolConstant.BIG_BLOCKING_QUEUE) + .setWorkQueue(new LinkedBlockingQueue(ThreadPoolConstant.BIG_QUEUE_SIZE)) .setHandler(new ThreadPoolExecutor.CallerRunsPolicy()) .setAllowCoreThreadTimeOut(true) .setKeepAliveTime(ThreadPoolConstant.SMALL_KEEP_LIVE_TIME, TimeUnit.SECONDS) diff --git a/austin-cron/src/main/java/com/java3y/austin/cron/handler/RefreshDingDingAccessTokenHandler.java b/austin-cron/src/main/java/com/java3y/austin/cron/handler/RefreshDingDingAccessTokenHandler.java index 1e86ab7..c908130 100644 --- a/austin-cron/src/main/java/com/java3y/austin/cron/handler/RefreshDingDingAccessTokenHandler.java +++ b/austin-cron/src/main/java/com/java3y/austin/cron/handler/RefreshDingDingAccessTokenHandler.java @@ -2,8 +2,8 @@ package com.java3y.austin.cron.handler; import cn.hutool.core.util.StrUtil; import com.alibaba.fastjson.JSON; +import com.java3y.austin.common.constant.AccessTokenPrefixConstant; import com.java3y.austin.common.constant.CommonConstant; -import com.java3y.austin.common.constant.SendAccountConstant; import com.java3y.austin.common.dto.account.DingDingWorkNoticeAccount; import com.java3y.austin.common.enums.ChannelType; import com.java3y.austin.support.config.SupportThreadPoolConfig; @@ -49,7 +49,7 @@ public class RefreshDingDingAccessTokenHandler { DingDingWorkNoticeAccount account = JSON.parseObject(channelAccount.getAccountConfig(), DingDingWorkNoticeAccount.class); String accessToken = AccessTokenUtils.getDingDingAccessToken(account); if (StrUtil.isNotBlank(accessToken)) { - redisTemplate.opsForValue().set(SendAccountConstant.DING_DING_ACCESS_TOKEN_PREFIX + channelAccount.getId(), accessToken); + redisTemplate.opsForValue().set(AccessTokenPrefixConstant.DING_DING_ACCESS_TOKEN_PREFIX + channelAccount.getId(), accessToken); } } }); diff --git a/austin-cron/src/main/java/com/java3y/austin/cron/handler/RefreshGeTuiAccessTokenHandler.java b/austin-cron/src/main/java/com/java3y/austin/cron/handler/RefreshGeTuiAccessTokenHandler.java index 320258b..300ebbb 100644 --- a/austin-cron/src/main/java/com/java3y/austin/cron/handler/RefreshGeTuiAccessTokenHandler.java +++ b/austin-cron/src/main/java/com/java3y/austin/cron/handler/RefreshGeTuiAccessTokenHandler.java @@ -2,8 +2,8 @@ package com.java3y.austin.cron.handler; import cn.hutool.core.util.StrUtil; import com.alibaba.fastjson.JSON; +import com.java3y.austin.common.constant.AccessTokenPrefixConstant; import com.java3y.austin.common.constant.CommonConstant; -import com.java3y.austin.common.constant.SendAccountConstant; import com.java3y.austin.common.dto.account.GeTuiAccount; import com.java3y.austin.common.enums.ChannelType; import com.java3y.austin.support.config.SupportThreadPoolConfig; @@ -49,12 +49,11 @@ public class RefreshGeTuiAccessTokenHandler { GeTuiAccount account = JSON.parseObject(channelAccount.getAccountConfig(), GeTuiAccount.class); String accessToken = AccessTokenUtils.getGeTuiAccessToken(account); if (StrUtil.isNotBlank(accessToken)) { - redisTemplate.opsForValue().set(SendAccountConstant.GE_TUI_ACCESS_TOKEN_PREFIX + channelAccount.getId(), accessToken); + redisTemplate.opsForValue().set(AccessTokenPrefixConstant.GE_TUI_ACCESS_TOKEN_PREFIX + channelAccount.getId(), accessToken); } } }); } - } diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/alipay/impl/AlipayMiniProgramAccountServiceImpl.java b/austin-handler/src/main/java/com/java3y/austin/handler/alipay/impl/AlipayMiniProgramAccountServiceImpl.java index 84caa4b..3bca3fc 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/alipay/impl/AlipayMiniProgramAccountServiceImpl.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/alipay/impl/AlipayMiniProgramAccountServiceImpl.java @@ -65,20 +65,5 @@ public class AlipayMiniProgramAccountServiceImpl implements AlipayMiniProgramAcc return requestList; } -// /** -// * 初始化支付宝小程序 -// */ -// private AlipayClient initService(AlipayMiniProgramAccount alipayMiniProgramAccount) throws AlipayApiException { -// AlipayConfig alipayConfig = new AlipayConfig(); -// alipayConfig.setServerUrl("https://openapi.alipaydev.com/gateway.do"); -// alipayConfig.setAppId(alipayMiniProgramAccount.getAppId()); -// alipayConfig.setPrivateKey(alipayMiniProgramAccount.getPrivateKey()); -// alipayConfig.setFormat("json"); -// alipayConfig.setAlipayPublicKey(alipayMiniProgramAccount.getAlipayPublicKey()); -// alipayConfig.setCharset("utf-8"); -// alipayConfig.setSignType("RSA2"); -// return new DefaultAlipayClient(alipayConfig); -// } - } diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/config/AlipayClientSingleton.java b/austin-handler/src/main/java/com/java3y/austin/handler/config/AlipayClientSingleton.java index 13e04be..c2ad036 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/config/AlipayClientSingleton.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/config/AlipayClientSingleton.java @@ -3,6 +3,7 @@ package com.java3y.austin.handler.config; import com.alipay.api.AlipayApiException; import com.alipay.api.AlipayConfig; import com.alipay.api.DefaultAlipayClient; +import com.java3y.austin.common.constant.SendChanelUrlConstant; import com.java3y.austin.common.dto.account.AlipayMiniProgramAccount; import java.util.HashMap; @@ -28,7 +29,7 @@ public class AlipayClientSingleton { synchronized (DefaultAlipayClient.class) { if (!alipayClientMap.containsKey(alipayMiniProgramAccount.getAppId())) { AlipayConfig alipayConfig = new AlipayConfig(); - alipayConfig.setServerUrl("https://openapi.alipaydev.com/gateway.do"); + alipayConfig.setServerUrl(SendChanelUrlConstant.ALI_MINI_PROGRAM_GATEWAY_URL); alipayConfig.setAppId(alipayMiniProgramAccount.getAppId()); alipayConfig.setPrivateKey(alipayMiniProgramAccount.getPrivateKey()); alipayConfig.setFormat("json"); diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/handler/Handler.java b/austin-handler/src/main/java/com/java3y/austin/handler/handler/Handler.java index 88291b5..35ff5b6 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/handler/Handler.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/handler/Handler.java @@ -1,7 +1,7 @@ package com.java3y.austin.handler.handler; +import com.java3y.austin.common.domain.RecallTaskInfo; import com.java3y.austin.common.domain.TaskInfo; -import com.java3y.austin.support.domain.MessageTemplate; /** * @author 3y @@ -19,9 +19,9 @@ public interface Handler { /** * 撤回消息 * - * @param messageTemplate + * @param recallTaskInfo * @return */ - void recall(MessageTemplate messageTemplate); + void recall(RecallTaskInfo recallTaskInfo); } diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/AlipayMiniProgramAccountHandler.java b/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/AlipayMiniProgramAccountHandler.java index 651ea47..fd8a89e 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/AlipayMiniProgramAccountHandler.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/AlipayMiniProgramAccountHandler.java @@ -2,6 +2,7 @@ package com.java3y.austin.handler.handler.impl; import com.alibaba.fastjson.JSON; import com.google.common.base.Throwables; +import com.java3y.austin.common.domain.RecallTaskInfo; import com.java3y.austin.common.domain.TaskInfo; import com.java3y.austin.common.dto.model.AlipayMiniProgramContentModel; import com.java3y.austin.common.enums.ChannelType; @@ -9,7 +10,6 @@ import com.java3y.austin.handler.alipay.AlipayMiniProgramAccountService; import com.java3y.austin.handler.domain.alipay.AlipayMiniProgramParam; import com.java3y.austin.handler.handler.BaseHandler; import com.java3y.austin.handler.handler.Handler; -import com.java3y.austin.support.domain.MessageTemplate; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -60,8 +60,9 @@ public class AlipayMiniProgramAccountHandler extends BaseHandler implements Hand return param; } + @Override - public void recall(MessageTemplate messageTemplate) { + public void recall(RecallTaskInfo recallTaskInfo) { } } diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/DingDingRobotHandler.java b/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/DingDingRobotHandler.java index d8dbcb2..f8e182f 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/DingDingRobotHandler.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/DingDingRobotHandler.java @@ -7,6 +7,7 @@ import com.alibaba.fastjson.JSON; import com.google.common.base.Throwables; import com.java3y.austin.common.constant.AustinConstant; import com.java3y.austin.common.constant.CommonConstant; +import com.java3y.austin.common.domain.RecallTaskInfo; import com.java3y.austin.common.domain.TaskInfo; import com.java3y.austin.common.dto.account.DingDingRobotAccount; import com.java3y.austin.common.dto.model.DingDingRobotContentModel; @@ -16,7 +17,6 @@ import com.java3y.austin.handler.domain.dingding.DingDingRobotParam; import com.java3y.austin.handler.domain.dingding.DingDingRobotResult; import com.java3y.austin.handler.handler.BaseHandler; import com.java3y.austin.handler.handler.Handler; -import com.java3y.austin.support.domain.MessageTemplate; import com.java3y.austin.support.utils.AccountUtils; import lombok.extern.slf4j.Slf4j; import org.apache.commons.codec.binary.Base64; @@ -137,7 +137,7 @@ public class DingDingRobotHandler extends BaseHandler implements Handler { @Override - public void recall(MessageTemplate messageTemplate) { + public void recall(RecallTaskInfo recallTaskInfo) { } } diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/DingDingWorkNoticeHandler.java b/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/DingDingWorkNoticeHandler.java index fdc9ada..a333fba 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/DingDingWorkNoticeHandler.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/DingDingWorkNoticeHandler.java @@ -13,9 +13,11 @@ import com.dingtalk.api.response.OapiMessageCorpconversationAsyncsendV2Response; import com.dingtalk.api.response.OapiMessageCorpconversationGetsendresultResponse; import com.dingtalk.api.response.OapiMessageCorpconversationRecallResponse; import com.google.common.base.Throwables; +import com.java3y.austin.common.constant.AccessTokenPrefixConstant; import com.java3y.austin.common.constant.AustinConstant; -import com.java3y.austin.common.constant.SendAccountConstant; +import com.java3y.austin.common.constant.SendChanelUrlConstant; import com.java3y.austin.common.domain.LogParam; +import com.java3y.austin.common.domain.RecallTaskInfo; import com.java3y.austin.common.domain.TaskInfo; import com.java3y.austin.common.dto.account.DingDingWorkNoticeAccount; import com.java3y.austin.common.dto.model.DingDingWorkContentModel; @@ -24,10 +26,10 @@ import com.java3y.austin.common.enums.SendMessageType; import com.java3y.austin.handler.handler.BaseHandler; import com.java3y.austin.handler.handler.Handler; import com.java3y.austin.support.config.SupportThreadPoolConfig; -import com.java3y.austin.support.domain.MessageTemplate; import com.java3y.austin.support.utils.AccessTokenUtils; import com.java3y.austin.support.utils.AccountUtils; import com.java3y.austin.support.utils.LogUtils; +import com.taobao.api.ApiException; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; @@ -60,9 +62,6 @@ public class DingDingWorkNoticeHandler extends BaseHandler implements Handler { channelCode = ChannelType.DING_DING_WORK_NOTICE.getCode(); } - private static final String SEND_URL = "https://oapi.dingtalk.com/topapi/message/corpconversation/asyncsend_v2"; - private static final String RECALL_URL = "https://oapi.dingtalk.com/topapi/message/corpconversation/recall"; - private static final String PULL_URL = "https://oapi.dingtalk.com/topapi/message/corpconversation/getsendresult"; private static final String DING_DING_RECALL_KEY_PREFIX = "RECALL_"; private static final String RECALL_BIZ_TYPE = "DingDingWorkNoticeHandler#recall"; @@ -72,12 +71,14 @@ public class DingDingWorkNoticeHandler extends BaseHandler implements Handler { DingDingWorkNoticeAccount account = accountUtils.getAccountById(taskInfo.getSendAccount(), DingDingWorkNoticeAccount.class); OapiMessageCorpconversationAsyncsendV2Request request = assembleParam(account, taskInfo); String accessToken = getAccessToken(account, Long.valueOf(taskInfo.getSendAccount())); - OapiMessageCorpconversationAsyncsendV2Response response = new DefaultDingTalkClient(SEND_URL).execute(request, accessToken); + OapiMessageCorpconversationAsyncsendV2Response response = new DefaultDingTalkClient(SendChanelUrlConstant.DING_DING_SEND_URL).execute(request, accessToken); // 发送成功后记录TaskId,用于消息撤回(支持当天的) if (response.getErrcode() == 0) { redisTemplate.opsForList().leftPush(DING_DING_RECALL_KEY_PREFIX + taskInfo.getMessageTemplateId(), String.valueOf(response.getTaskId())); + redisTemplate.opsForValue().set(DING_DING_RECALL_KEY_PREFIX + taskInfo.getMessageId(), String.valueOf(response.getTaskId())); redisTemplate.expire(DING_DING_RECALL_KEY_PREFIX + taskInfo.getMessageTemplateId(), (DateUtil.endOfDay(new Date()).getTime() - DateUtil.current()) / 1000, TimeUnit.SECONDS); + redisTemplate.expire(DING_DING_RECALL_KEY_PREFIX + taskInfo.getMessageId(), (DateUtil.endOfDay(new Date()).getTime() - DateUtil.current()) / 1000, TimeUnit.SECONDS); return true; } @@ -172,27 +173,49 @@ public class DingDingWorkNoticeHandler extends BaseHandler implements Handler { return req; } + /** + * 拉取回执 + */ + public void pull(Long accountId) { + try { + DingDingWorkNoticeAccount account = accountUtils.getAccountById(accountId.intValue(), DingDingWorkNoticeAccount.class); + String accessToken = getAccessToken(account, accountId); + DingTalkClient client = new DefaultDingTalkClient(SendChanelUrlConstant.DING_DING_PULL_URL); + OapiMessageCorpconversationGetsendresultRequest req = new OapiMessageCorpconversationGetsendresultRequest(); + req.setAgentId(Long.valueOf(account.getAgentId())); + req.setTaskId(456L); + OapiMessageCorpconversationGetsendresultResponse rsp = client.execute(req, accessToken); + } catch (Exception e) { + log.error("DingDingWorkNoticeHandler#pull fail:{}", Throwables.getStackTraceAsString(e)); + } + } + /** - * 在下发的时候存储了messageTemplate -> taskIdList - * 只要还存在taskIdList,则将其去除 + * 在下发的时候存储了 + * messageTemplate -> taskIdList + * messageId -> taskIdList + *

+ * 在有效期内的taskIdList,优先撤回messageId,如果未传入messageId,则按照模板id撤回 * - * @param messageTemplate + * @param recallTaskInfo */ @Override - public void recall(MessageTemplate messageTemplate) { + public void recall(RecallTaskInfo recallTaskInfo) { SupportThreadPoolConfig.getPendingSingleThreadPool().execute(() -> { try { - DingDingWorkNoticeAccount account = accountUtils.getAccountById(messageTemplate.getSendAccount(), DingDingWorkNoticeAccount.class); - String accessToken = getAccessToken(account, Long.valueOf(messageTemplate.getSendAccount())); - while (redisTemplate.opsForList().size(DING_DING_RECALL_KEY_PREFIX + messageTemplate.getId()) > 0) { - String taskId = redisTemplate.opsForList().leftPop(DING_DING_RECALL_KEY_PREFIX + messageTemplate.getId()); - DingTalkClient client = new DefaultDingTalkClient(RECALL_URL); - OapiMessageCorpconversationRecallRequest req = new OapiMessageCorpconversationRecallRequest(); - req.setAgentId(Long.valueOf(account.getAgentId())); - req.setMsgTaskId(Long.valueOf(taskId)); - OapiMessageCorpconversationRecallResponse rsp = client.execute(req, accessToken); - logUtils.print(LogParam.builder().bizType(RECALL_BIZ_TYPE).object(JSON.toJSONString(rsp)).build()); + DingDingWorkNoticeAccount account = accountUtils.getAccountById(recallTaskInfo.getSendAccount(), DingDingWorkNoticeAccount.class); + String accessToken = getAccessToken(account, Long.valueOf(recallTaskInfo.getSendAccount())); + + // 优先去除messageId,如果未传入messageId,则按照模板id去除 + if (CollUtil.isNotEmpty(recallTaskInfo.getRecallMessageId())) { + String taskId = redisTemplate.opsForValue().get(DING_DING_RECALL_KEY_PREFIX + recallTaskInfo.getMessageTemplateId()); + recallBiz(account, accessToken, taskId); + } else { + while (redisTemplate.opsForList().size(DING_DING_RECALL_KEY_PREFIX + recallTaskInfo.getMessageTemplateId()) > 0) { + String taskId = redisTemplate.opsForList().leftPop(DING_DING_RECALL_KEY_PREFIX + recallTaskInfo.getMessageTemplateId()); + recallBiz(account, accessToken, taskId); + } } } catch (Exception e) { log.error("DingDingWorkNoticeHandler#recall fail:{}", Throwables.getStackTraceAsString(e)); @@ -201,20 +224,20 @@ public class DingDingWorkNoticeHandler extends BaseHandler implements Handler { } /** - * 拉取回执 + * 调用 钉钉api 撤回消息 + * + * @param account + * @param accessToken + * @param taskId + * @throws ApiException */ - public void pull(Long accountId) { - try { - DingDingWorkNoticeAccount account = accountUtils.getAccountById(accountId.intValue(), DingDingWorkNoticeAccount.class); - String accessToken = getAccessToken(account, accountId); - DingTalkClient client = new DefaultDingTalkClient(PULL_URL); - OapiMessageCorpconversationGetsendresultRequest req = new OapiMessageCorpconversationGetsendresultRequest(); - req.setAgentId(Long.valueOf(account.getAgentId())); - req.setTaskId(456L); - OapiMessageCorpconversationGetsendresultResponse rsp = client.execute(req, accessToken); - } catch (Exception e) { - log.error("DingDingWorkNoticeHandler#pull fail:{}", Throwables.getStackTraceAsString(e)); - } + private void recallBiz(DingDingWorkNoticeAccount account, String accessToken, String taskId) throws ApiException { + DingTalkClient client = new DefaultDingTalkClient(SendChanelUrlConstant.DING_DING_RECALL_URL); + OapiMessageCorpconversationRecallRequest req = new OapiMessageCorpconversationRecallRequest(); + req.setAgentId(Long.valueOf(account.getAgentId())); + req.setMsgTaskId(Long.valueOf(taskId)); + OapiMessageCorpconversationRecallResponse rsp = client.execute(req, accessToken); + logUtils.print(LogParam.builder().bizType(RECALL_BIZ_TYPE).object(JSON.toJSONString(rsp)).build()); } /** @@ -225,17 +248,18 @@ public class DingDingWorkNoticeHandler extends BaseHandler implements Handler { * @return token */ private String getAccessToken(DingDingWorkNoticeAccount account, Long accountId) { - String accessToken = redisTemplate.opsForValue().get(SendAccountConstant.DING_DING_ACCESS_TOKEN_PREFIX + accountId); + String accessToken = redisTemplate.opsForValue().get(AccessTokenPrefixConstant.DING_DING_ACCESS_TOKEN_PREFIX + accountId); if (StrUtil.isNotBlank(accessToken)) { return accessToken; } accessToken = AccessTokenUtils.getDingDingAccessToken(account); if (StrUtil.isNotBlank(accessToken)) { - redisTemplate.opsForValue().set(SendAccountConstant.DING_DING_ACCESS_TOKEN_PREFIX + accountId, accessToken); + redisTemplate.opsForValue().set(AccessTokenPrefixConstant.DING_DING_ACCESS_TOKEN_PREFIX + accountId, accessToken); } else { log.error("DingDingWorkNoticeHandler#getAccessToken fail accessToken{} accountId{} ", accessToken, accountId); } return accessToken; } + } diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/EmailHandler.java b/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/EmailHandler.java index 40187ee..335208a 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/EmailHandler.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/EmailHandler.java @@ -7,6 +7,7 @@ import cn.hutool.extra.mail.MailAccount; import cn.hutool.extra.mail.MailUtil; import com.google.common.base.Throwables; import com.google.common.util.concurrent.RateLimiter; +import com.java3y.austin.common.domain.RecallTaskInfo; import com.java3y.austin.common.domain.TaskInfo; import com.java3y.austin.common.dto.model.EmailContentModel; import com.java3y.austin.common.enums.ChannelType; @@ -14,7 +15,6 @@ import com.java3y.austin.handler.enums.RateLimitStrategy; import com.java3y.austin.handler.flowcontrol.FlowControlParam; import com.java3y.austin.handler.handler.BaseHandler; import com.java3y.austin.handler.handler.Handler; -import com.java3y.austin.support.domain.MessageTemplate; import com.java3y.austin.support.utils.AccountUtils; import com.java3y.austin.support.utils.AustinFileUtils; import com.sun.mail.util.MailSSLSocketFactory; @@ -85,8 +85,9 @@ public class EmailHandler extends BaseHandler implements Handler { return account; } + @Override - public void recall(MessageTemplate messageTemplate) { + public void recall(RecallTaskInfo recallTaskInfo) { } } diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/EnterpriseWeChatHandler.java b/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/EnterpriseWeChatHandler.java index e319757..9e2c136 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/EnterpriseWeChatHandler.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/EnterpriseWeChatHandler.java @@ -5,13 +5,13 @@ import com.alibaba.fastjson.JSON; import com.google.common.base.Throwables; import com.java3y.austin.common.constant.AustinConstant; import com.java3y.austin.common.constant.CommonConstant; +import com.java3y.austin.common.domain.RecallTaskInfo; import com.java3y.austin.common.domain.TaskInfo; import com.java3y.austin.common.dto.model.EnterpriseWeChatContentModel; import com.java3y.austin.common.enums.ChannelType; import com.java3y.austin.common.enums.SendMessageType; import com.java3y.austin.handler.handler.BaseHandler; import com.java3y.austin.handler.handler.Handler; -import com.java3y.austin.support.domain.MessageTemplate; import com.java3y.austin.support.utils.AccountUtils; import lombok.extern.slf4j.Slf4j; import me.chanjar.weixin.common.error.WxMpErrorMsgEnum; @@ -126,10 +126,10 @@ public class EnterpriseWeChatHandler extends BaseHandler implements Handler { return wxCpMessage; } + @Override - public void recall(MessageTemplate messageTemplate) { + public void recall(RecallTaskInfo recallTaskInfo) { } - } diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/EnterpriseWeChatRobotHandler.java b/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/EnterpriseWeChatRobotHandler.java index fb72fc1..d6b3d34 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/EnterpriseWeChatRobotHandler.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/EnterpriseWeChatRobotHandler.java @@ -5,6 +5,7 @@ import cn.hutool.http.Header; import cn.hutool.http.HttpRequest; import com.alibaba.fastjson.JSON; import com.google.common.base.Throwables; +import com.java3y.austin.common.domain.RecallTaskInfo; import com.java3y.austin.common.domain.TaskInfo; import com.java3y.austin.common.dto.account.EnterpriseWeChatRobotAccount; import com.java3y.austin.common.dto.model.EnterpriseWeChatRobotContentModel; @@ -14,7 +15,6 @@ import com.java3y.austin.handler.domain.wechat.robot.EnterpriseWeChatRobotParam; import com.java3y.austin.handler.domain.wechat.robot.EnterpriseWeChatRootResult; import com.java3y.austin.handler.handler.BaseHandler; import com.java3y.austin.handler.handler.Handler; -import com.java3y.austin.support.domain.MessageTemplate; import com.java3y.austin.support.utils.AccountUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -85,8 +85,9 @@ public class EnterpriseWeChatRobotHandler extends BaseHandler implements Handler return param; } + @Override - public void recall(MessageTemplate messageTemplate) { + public void recall(RecallTaskInfo recallTaskInfo) { } } diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/FeiShuRobotHandler.java b/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/FeiShuRobotHandler.java index 722b5ce..31748c5 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/FeiShuRobotHandler.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/FeiShuRobotHandler.java @@ -5,6 +5,7 @@ import cn.hutool.http.Header; import cn.hutool.http.HttpRequest; import com.alibaba.fastjson.JSON; import com.google.common.base.Throwables; +import com.java3y.austin.common.domain.RecallTaskInfo; import com.java3y.austin.common.domain.TaskInfo; import com.java3y.austin.common.dto.account.FeiShuRobotAccount; import com.java3y.austin.common.dto.model.FeiShuRobotContentModel; @@ -14,7 +15,6 @@ import com.java3y.austin.handler.domain.feishu.FeiShuRobotParam; import com.java3y.austin.handler.domain.feishu.FeiShuRobotResult; import com.java3y.austin.handler.handler.BaseHandler; import com.java3y.austin.handler.handler.Handler; -import com.java3y.austin.support.domain.MessageTemplate; import com.java3y.austin.support.utils.AccountUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -90,8 +90,9 @@ public class FeiShuRobotHandler extends BaseHandler implements Handler { return param; } + @Override - public void recall(MessageTemplate messageTemplate) { + public void recall(RecallTaskInfo recallTaskInfo) { } } diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/MiniProgramAccountHandler.java b/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/MiniProgramAccountHandler.java index e754286..8539e5b 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/MiniProgramAccountHandler.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/MiniProgramAccountHandler.java @@ -4,12 +4,12 @@ import cn.binarywang.wx.miniapp.api.WxMaService; import cn.binarywang.wx.miniapp.bean.WxMaSubscribeMessage; import com.alibaba.fastjson.JSON; import com.google.common.base.Throwables; +import com.java3y.austin.common.domain.RecallTaskInfo; import com.java3y.austin.common.domain.TaskInfo; import com.java3y.austin.common.dto.model.MiniProgramContentModel; import com.java3y.austin.common.enums.ChannelType; import com.java3y.austin.handler.handler.BaseHandler; import com.java3y.austin.handler.handler.Handler; -import com.java3y.austin.support.domain.MessageTemplate; import com.java3y.austin.support.utils.AccountUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -77,8 +77,9 @@ public class MiniProgramAccountHandler extends BaseHandler implements Handler { return templateDataList; } + @Override - public void recall(MessageTemplate messageTemplate) { + public void recall(RecallTaskInfo recallTaskInfo) { } } diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/OfficialAccountHandler.java b/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/OfficialAccountHandler.java index 79461fb..fcd1e37 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/OfficialAccountHandler.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/OfficialAccountHandler.java @@ -2,12 +2,12 @@ package com.java3y.austin.handler.handler.impl; import com.alibaba.fastjson.JSON; import com.google.common.base.Throwables; +import com.java3y.austin.common.domain.RecallTaskInfo; import com.java3y.austin.common.domain.TaskInfo; import com.java3y.austin.common.dto.model.OfficialAccountsContentModel; import com.java3y.austin.common.enums.ChannelType; import com.java3y.austin.handler.handler.BaseHandler; import com.java3y.austin.handler.handler.Handler; -import com.java3y.austin.support.domain.MessageTemplate; import com.java3y.austin.support.utils.AccountUtils; import lombok.extern.slf4j.Slf4j; import me.chanjar.weixin.mp.api.WxMpService; @@ -86,8 +86,9 @@ public class OfficialAccountHandler extends BaseHandler implements Handler { return templateDataList; } + @Override - public void recall(MessageTemplate messageTemplate) { + public void recall(RecallTaskInfo recallTaskInfo) { } } diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/PushHandler.java b/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/PushHandler.java index 9d528f4..ce7bcf1 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/PushHandler.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/PushHandler.java @@ -8,7 +8,9 @@ import cn.hutool.http.Header; import cn.hutool.http.HttpRequest; import com.alibaba.fastjson.JSON; import com.google.common.base.Throwables; -import com.java3y.austin.common.constant.SendAccountConstant; +import com.java3y.austin.common.constant.AccessTokenPrefixConstant; +import com.java3y.austin.common.constant.SendChanelUrlConstant; +import com.java3y.austin.common.domain.RecallTaskInfo; import com.java3y.austin.common.domain.TaskInfo; import com.java3y.austin.common.dto.account.GeTuiAccount; import com.java3y.austin.common.dto.model.PushContentModel; @@ -19,7 +21,6 @@ import com.java3y.austin.handler.domain.push.getui.SendPushParam; import com.java3y.austin.handler.domain.push.getui.SendPushResult; import com.java3y.austin.handler.handler.BaseHandler; import com.java3y.austin.handler.handler.Handler; -import com.java3y.austin.support.domain.MessageTemplate; import com.java3y.austin.support.utils.AccessTokenUtils; import com.java3y.austin.support.utils.AccountUtils; import lombok.extern.slf4j.Slf4j; @@ -40,11 +41,6 @@ import java.util.Set; @Slf4j public class PushHandler extends BaseHandler implements Handler { - private static final String BASE_URL = "https://restapi.getui.com/v2/"; - private static final String SINGLE_PUSH_PATH = "/push/single/cid"; - private static final String BATCH_PUSH_CREATE_TASK_PATH = "/push/list/message"; - private static final String BATCH_PUSH_PATH = "/push/list/cid"; - public PushHandler() { channelCode = ChannelType.PUSH.getCode(); } @@ -89,7 +85,7 @@ public class PushHandler extends BaseHandler implements Handler { * @return http result */ private String singlePush(PushParam pushParam) { - String url = BASE_URL + pushParam.getAppId() + SINGLE_PUSH_PATH; + String url = SendChanelUrlConstant.GE_TUI_BASE_URL + pushParam.getAppId() + SendChanelUrlConstant.GE_TUI_SINGLE_PUSH_PATH; SendPushParam sendPushParam = assembleParam((PushContentModel) pushParam.getTaskInfo().getContentModel(), pushParam.getTaskInfo().getReceiver()); String body = HttpRequest.post(url).header(Header.CONTENT_TYPE.getValue(), ContentType.JSON.getValue()) .header("token", pushParam.getToken()) @@ -108,7 +104,7 @@ public class PushHandler extends BaseHandler implements Handler { * @return */ private String batchPush(String taskId, PushParam pushParam) { - String url = BASE_URL + pushParam.getAppId() + BATCH_PUSH_PATH; + String url = SendChanelUrlConstant.GE_TUI_BASE_URL + pushParam.getAppId() + SendChanelUrlConstant.GE_TUI_BATCH_PUSH_PATH; BatchSendPushParam batchSendPushParam = BatchSendPushParam.builder() .taskId(taskId) .isAsync(true) @@ -129,7 +125,7 @@ public class PushHandler extends BaseHandler implements Handler { * @return http result */ private String createTaskId(PushParam pushParam) { - String url = BASE_URL + pushParam.getAppId() + BATCH_PUSH_CREATE_TASK_PATH; + String url = SendChanelUrlConstant.GE_TUI_BASE_URL + pushParam.getAppId() + SendChanelUrlConstant.GE_TUI_BATCH_PUSH_CREATE_TASK_PATH; SendPushParam param = assembleParam((PushContentModel) pushParam.getTaskInfo().getContentModel()); String taskId = ""; try { @@ -155,13 +151,13 @@ public class PushHandler extends BaseHandler implements Handler { * @return token */ private String getAccessToken(TaskInfo taskInfo, GeTuiAccount account) { - String token = redisTemplate.opsForValue().get(SendAccountConstant.GE_TUI_ACCESS_TOKEN_PREFIX + taskInfo.getSendAccount()); + String token = redisTemplate.opsForValue().get(AccessTokenPrefixConstant.GE_TUI_ACCESS_TOKEN_PREFIX + taskInfo.getSendAccount()); if (StrUtil.isNotBlank(token)) { return token; } token = AccessTokenUtils.getGeTuiAccessToken(account); if (StrUtil.isNotBlank(token)) { - redisTemplate.opsForValue().set(SendAccountConstant.GE_TUI_ACCESS_TOKEN_PREFIX + taskInfo.getSendAccount(), token); + redisTemplate.opsForValue().set(AccessTokenPrefixConstant.GE_TUI_ACCESS_TOKEN_PREFIX + taskInfo.getSendAccount(), token); } else { log.error("PushHandler#getAccessToken fail taskInfo:{} account:{}", taskInfo, account); } @@ -185,8 +181,9 @@ public class PushHandler extends BaseHandler implements Handler { return param; } + @Override - public void recall(MessageTemplate messageTemplate) { + public void recall(RecallTaskInfo recallTaskInfo) { } } diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/SmsHandler.java b/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/SmsHandler.java index 9129922..c4afc85 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/SmsHandler.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/SmsHandler.java @@ -6,6 +6,7 @@ import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.google.common.base.Throwables; import com.java3y.austin.common.constant.CommonConstant; +import com.java3y.austin.common.domain.RecallTaskInfo; import com.java3y.austin.common.domain.TaskInfo; import com.java3y.austin.common.dto.account.sms.SmsAccount; import com.java3y.austin.common.dto.model.SmsContentModel; @@ -16,7 +17,6 @@ import com.java3y.austin.handler.handler.BaseHandler; import com.java3y.austin.handler.handler.Handler; import com.java3y.austin.handler.script.SmsScript; import com.java3y.austin.support.dao.SmsRecordDao; -import com.java3y.austin.support.domain.MessageTemplate; import com.java3y.austin.support.domain.SmsRecord; import com.java3y.austin.support.service.ConfigService; import com.java3y.austin.support.utils.AccountUtils; @@ -182,7 +182,7 @@ public class SmsHandler extends BaseHandler implements Handler { } @Override - public void recall(MessageTemplate messageTemplate) { + public void recall(RecallTaskInfo recallTaskInfo) { } } diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/eventbus/EventBusReceiver.java b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/eventbus/EventBusReceiver.java index e996361..3274362 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/eventbus/EventBusReceiver.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/eventbus/EventBusReceiver.java @@ -1,10 +1,10 @@ package com.java3y.austin.handler.receiver.eventbus; import com.google.common.eventbus.Subscribe; +import com.java3y.austin.common.domain.RecallTaskInfo; import com.java3y.austin.common.domain.TaskInfo; import com.java3y.austin.handler.receiver.service.ConsumeService; import com.java3y.austin.support.constans.MessageQueuePipeline; -import com.java3y.austin.support.domain.MessageTemplate; import com.java3y.austin.support.mq.eventbus.EventBusListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; @@ -31,7 +31,7 @@ public class EventBusReceiver implements EventBusListener { @Override @Subscribe - public void recall(MessageTemplate messageTemplate) { - consumeService.consume2recall(messageTemplate); + public void recall(RecallTaskInfo recallTaskInfo) { + consumeService.consume2recall(recallTaskInfo); } } diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/kafka/Receiver.java b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/kafka/Receiver.java index 4319111..ba19f70 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/kafka/Receiver.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/kafka/Receiver.java @@ -2,11 +2,11 @@ package com.java3y.austin.handler.receiver.kafka; import cn.hutool.core.collection.CollUtil; import com.alibaba.fastjson.JSON; +import com.java3y.austin.common.domain.RecallTaskInfo; import com.java3y.austin.common.domain.TaskInfo; import com.java3y.austin.handler.receiver.service.ConsumeService; import com.java3y.austin.handler.utils.GroupIdMappingUtils; import com.java3y.austin.support.constans.MessageQueuePipeline; -import com.java3y.austin.support.domain.MessageTemplate; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.beans.factory.annotation.Autowired; @@ -64,8 +64,8 @@ public class Receiver { public void recall(ConsumerRecord consumerRecord) { Optional kafkaMessage = Optional.ofNullable(consumerRecord.value()); if (kafkaMessage.isPresent()) { - MessageTemplate messageTemplate = JSON.parseObject(kafkaMessage.get(), MessageTemplate.class); - consumeService.consume2recall(messageTemplate); + RecallTaskInfo recallTaskInfo = JSON.parseObject(kafkaMessage.get(), RecallTaskInfo.class); + consumeService.consume2recall(recallTaskInfo); } } } diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/rabbit/RabbitMqReceiver.java b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/rabbit/RabbitMqReceiver.java index 2b50a77..35ec130 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/rabbit/RabbitMqReceiver.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/rabbit/RabbitMqReceiver.java @@ -1,10 +1,10 @@ package com.java3y.austin.handler.receiver.rabbit; import com.alibaba.fastjson.JSON; +import com.java3y.austin.common.domain.RecallTaskInfo; import com.java3y.austin.common.domain.TaskInfo; import com.java3y.austin.handler.receiver.service.ConsumeService; import com.java3y.austin.support.constans.MessageQueuePipeline; -import com.java3y.austin.support.domain.MessageTemplate; import org.apache.commons.lang3.StringUtils; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.core.Message; @@ -51,8 +51,8 @@ public class RabbitMqReceiver { consumeService.consume2Send(taskInfoLists); } else if (MSG_TYPE_RECALL.equals(messageType)) { // 处理撤回消息 - MessageTemplate messageTemplate = JSON.parseObject(messageContent, MessageTemplate.class); - consumeService.consume2recall(messageTemplate); + RecallTaskInfo recallTaskInfo = JSON.parseObject(messageContent, RecallTaskInfo.class); + consumeService.consume2recall(recallTaskInfo); } } diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/rocketmq/RocketMqRecallReceiver.java b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/rocketmq/RocketMqRecallReceiver.java index 66eeb31..9f2a8e7 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/rocketmq/RocketMqRecallReceiver.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/rocketmq/RocketMqRecallReceiver.java @@ -1,9 +1,9 @@ package com.java3y.austin.handler.receiver.rocketmq; import com.alibaba.fastjson.JSON; +import com.java3y.austin.common.domain.RecallTaskInfo; import com.java3y.austin.handler.receiver.service.ConsumeService; import com.java3y.austin.support.constans.MessageQueuePipeline; -import com.java3y.austin.support.domain.MessageTemplate; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.annotation.SelectorType; @@ -35,7 +35,7 @@ public class RocketMqRecallReceiver implements RocketMQListener { if (StringUtils.isBlank(message)) { return; } - MessageTemplate messageTemplate = JSON.parseObject(message, MessageTemplate.class); - consumeService.consume2recall(messageTemplate); + RecallTaskInfo recallTaskInfo = JSON.parseObject(message, RecallTaskInfo.class); + consumeService.consume2recall(recallTaskInfo); } } diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/service/ConsumeService.java b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/service/ConsumeService.java index b3ed789..8813bd0 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/service/ConsumeService.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/service/ConsumeService.java @@ -1,8 +1,8 @@ package com.java3y.austin.handler.receiver.service; +import com.java3y.austin.common.domain.RecallTaskInfo; import com.java3y.austin.common.domain.TaskInfo; -import com.java3y.austin.support.domain.MessageTemplate; import java.util.List; @@ -23,10 +23,12 @@ public interface ConsumeService { /** * 从MQ拉到消息进行消费,撤回消息 + * 如果有 recallMessageId ,则优先撤回 recallMessageId + * 如果没有 recallMessageId ,则撤回整个模板的消息 * - * @param messageTemplate + * @param recallTaskInfo */ - void consume2recall(MessageTemplate messageTemplate); + void consume2recall(RecallTaskInfo recallTaskInfo); } diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/service/impl/ConsumeServiceImpl.java b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/service/impl/ConsumeServiceImpl.java index 7ab3bb3..68659cd 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/service/impl/ConsumeServiceImpl.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/service/impl/ConsumeServiceImpl.java @@ -3,6 +3,7 @@ package com.java3y.austin.handler.receiver.service.impl; import cn.hutool.core.collection.CollUtil; import com.java3y.austin.common.domain.AnchorInfo; import com.java3y.austin.common.domain.LogParam; +import com.java3y.austin.common.domain.RecallTaskInfo; import com.java3y.austin.common.domain.TaskInfo; import com.java3y.austin.common.enums.AnchorState; import com.java3y.austin.handler.handler.HandlerHolder; @@ -10,7 +11,6 @@ import com.java3y.austin.handler.pending.Task; import com.java3y.austin.handler.pending.TaskPendingHolder; import com.java3y.austin.handler.receiver.service.ConsumeService; import com.java3y.austin.handler.utils.GroupIdMappingUtils; -import com.java3y.austin.support.domain.MessageTemplate; import com.java3y.austin.support.utils.LogUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; @@ -48,8 +48,8 @@ public class ConsumeServiceImpl implements ConsumeService { } @Override - public void consume2recall(MessageTemplate messageTemplate) { - logUtils.print(LogParam.builder().bizType(LOG_BIZ_RECALL_TYPE).object(messageTemplate).build()); - handlerHolder.route(messageTemplate.getSendChannel()).recall(messageTemplate); + public void consume2recall(RecallTaskInfo recallTaskInfo) { + logUtils.print(LogParam.builder().bizType(LOG_BIZ_RECALL_TYPE).object(recallTaskInfo).build()); + handlerHolder.route(recallTaskInfo.getSendChannel()).recall(recallTaskInfo); } } diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/springeventbus/SpringEventBusReceiver.java b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/springeventbus/SpringEventBusReceiver.java index 868497f..70d72d4 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/springeventbus/SpringEventBusReceiver.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/springeventbus/SpringEventBusReceiver.java @@ -1,9 +1,9 @@ package com.java3y.austin.handler.receiver.springeventbus; +import com.java3y.austin.common.domain.RecallTaskInfo; import com.java3y.austin.common.domain.TaskInfo; import com.java3y.austin.handler.receiver.service.ConsumeService; import com.java3y.austin.support.constans.MessageQueuePipeline; -import com.java3y.austin.support.domain.MessageTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; @@ -27,7 +27,7 @@ public class SpringEventBusReceiver { consumeService.consume2Send(lists); } - public void recall(MessageTemplate messageTemplate) { - consumeService.consume2recall(messageTemplate); + public void recall(RecallTaskInfo recallTaskInfo) { + consumeService.consume2recall(recallTaskInfo); } } diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/springeventbus/SpringEventBusReceiverListener.java b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/springeventbus/SpringEventBusReceiverListener.java index f57ecff..0fc1a44 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/springeventbus/SpringEventBusReceiverListener.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/springeventbus/SpringEventBusReceiverListener.java @@ -1,9 +1,9 @@ package com.java3y.austin.handler.receiver.springeventbus; import com.alibaba.fastjson.JSON; +import com.java3y.austin.common.domain.RecallTaskInfo; import com.java3y.austin.common.domain.TaskInfo; import com.java3y.austin.support.constans.MessageQueuePipeline; -import com.java3y.austin.support.domain.MessageTemplate; import com.java3y.austin.support.mq.springeventbus.AustinSpringEventBusEvent; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; @@ -36,7 +36,7 @@ public class SpringEventBusReceiverListener implements ApplicationListener send(SmsParam smsParam) { diff --git a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/recall/RecallAssembleAction.java b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/recall/RecallAssembleAction.java new file mode 100644 index 0000000..ebde13a --- /dev/null +++ b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/recall/RecallAssembleAction.java @@ -0,0 +1,54 @@ +package com.java3y.austin.service.api.impl.action.recall; + +import com.google.common.base.Throwables; +import com.java3y.austin.common.constant.CommonConstant; +import com.java3y.austin.common.domain.RecallTaskInfo; +import com.java3y.austin.common.enums.RespStatusEnum; +import com.java3y.austin.common.vo.BasicResultVO; +import com.java3y.austin.service.api.impl.domain.RecallTaskModel; +import com.java3y.austin.support.dao.MessageTemplateDao; +import com.java3y.austin.support.domain.MessageTemplate; +import com.java3y.austin.support.pipeline.BusinessProcess; +import com.java3y.austin.support.pipeline.ProcessContext; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.Optional; + +/** + * @author 3y + * 组装撤回参数 + */ +@Slf4j +@Service +public class RecallAssembleAction implements BusinessProcess { + + @Autowired + private MessageTemplateDao messageTemplateDao; + + @Override + public void process(ProcessContext context) { + RecallTaskModel recallTaskModel = context.getProcessModel(); + Long messageTemplateId = recallTaskModel.getMessageTemplateId(); + try { + Optional messageTemplate = messageTemplateDao.findById(messageTemplateId); + if (!messageTemplate.isPresent() || messageTemplate.get().getIsDeleted().equals(CommonConstant.TRUE)) { + context.setNeedBreak(true).setResponse(BasicResultVO.fail(RespStatusEnum.TEMPLATE_NOT_FOUND)); + return; + } + + RecallTaskInfo recallTaskInfo = RecallTaskInfo.builder().messageTemplateId(messageTemplateId) + .recallMessageId(recallTaskModel.getRecallMessageId()) + .sendAccount(messageTemplate.get().getSendAccount()) + .sendChannel(messageTemplate.get().getSendChannel()) + .build(); + recallTaskModel.setRecallTaskInfo(recallTaskInfo); + + } catch (Exception e) { + context.setNeedBreak(true).setResponse(BasicResultVO.fail(RespStatusEnum.SERVICE_ERROR)); + log.error("assemble recall task fail! templateId:{}, e:{}", messageTemplateId, Throwables.getStackTraceAsString(e)); + } + } + +} diff --git a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/recall/RecallMqAction.java b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/recall/RecallMqAction.java new file mode 100644 index 0000000..c6b3275 --- /dev/null +++ b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/recall/RecallMqAction.java @@ -0,0 +1,49 @@ +package com.java3y.austin.service.api.impl.action.recall; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.serializer.SerializerFeature; +import com.google.common.base.Throwables; +import com.java3y.austin.common.domain.RecallTaskInfo; +import com.java3y.austin.common.enums.RespStatusEnum; +import com.java3y.austin.common.vo.BasicResultVO; +import com.java3y.austin.service.api.impl.domain.RecallTaskModel; +import com.java3y.austin.support.mq.SendMqService; +import com.java3y.austin.support.pipeline.BusinessProcess; +import com.java3y.austin.support.pipeline.ProcessContext; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +/** + * @author 3y + * 将撤回消息发送到MQ + */ +@Slf4j +@Service +public class RecallMqAction implements BusinessProcess { + @Autowired + private SendMqService sendMqService; + + @Value("${austin.business.recall.topic.name}") + private String austinRecall; + @Value("${austin.business.tagId.value}") + private String tagId; + + @Value("${austin.mq.pipeline}") + private String mqPipeline; + + @Override + public void process(ProcessContext context) { + RecallTaskInfo recallTaskInfo = context.getProcessModel().getRecallTaskInfo(); + try { + String message = JSON.toJSONString(recallTaskInfo, new SerializerFeature[]{SerializerFeature.WriteClassName}); + sendMqService.send(austinRecall, message, tagId); + } catch (Exception e) { + context.setNeedBreak(true).setResponse(BasicResultVO.fail(RespStatusEnum.SERVICE_ERROR)); + log.error("send {} fail! e:{},params:{}", mqPipeline, Throwables.getStackTraceAsString(e) + , JSON.toJSONString(recallTaskInfo)); + } + } + +} diff --git a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/AfterParamCheckAction.java b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/send/SendAfterCheckAction.java similarity index 86% rename from austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/AfterParamCheckAction.java rename to austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/send/SendAfterCheckAction.java index 3dd04fa..306ecfa 100644 --- a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/AfterParamCheckAction.java +++ b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/send/SendAfterCheckAction.java @@ -1,10 +1,9 @@ -package com.java3y.austin.service.api.impl.action; +package com.java3y.austin.service.api.impl.action.send; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.util.ReUtil; import com.alibaba.fastjson.JSON; -import com.java3y.austin.common.domain.SimpleTaskInfo; import com.java3y.austin.common.domain.TaskInfo; import com.java3y.austin.common.enums.IdType; import com.java3y.austin.common.enums.RespStatusEnum; @@ -28,13 +27,15 @@ import java.util.stream.Collectors; */ @Slf4j @Service -public class AfterParamCheckAction implements BusinessProcess { +public class SendAfterCheckAction implements BusinessProcess { + /** + * 邮件和手机号正则 + */ + public static final HashMap CHANNEL_REGEX_EXP = new HashMap<>(); public static final String PHONE_REGEX_EXP = "^((13[0-9])|(14[5,7,9])|(15[0-3,5-9])|(166)|(17[0-9])|(18[0-9])|(19[1,8,9]))\\d{8}$"; public static final String EMAIL_REGEX_EXP = "^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$"; - public static final HashMap CHANNEL_REGEX_EXP = new HashMap<>(); - static { CHANNEL_REGEX_EXP.put(IdType.PHONE.getCode(), PHONE_REGEX_EXP); CHANNEL_REGEX_EXP.put(IdType.EMAIL.getCode(), EMAIL_REGEX_EXP); @@ -46,16 +47,13 @@ public class AfterParamCheckAction implements BusinessProcess { SendTaskModel sendTaskModel = context.getProcessModel(); List taskInfo = sendTaskModel.getTaskInfo(); - // 1. 过滤掉不合法的手机号、邮件 + // 过滤掉不合法的手机号、邮件 filterIllegalReceiver(taskInfo); - if (CollUtil.isEmpty(taskInfo)) { context.setNeedBreak(true).setResponse(BasicResultVO.fail(RespStatusEnum.CLIENT_BAD_PARAMETERS, "手机号或邮箱不合法, 无有效的发送任务")); return; } - // 数据组装 - context.setResponse(BasicResultVO.success(taskInfo.stream().map(v -> SimpleTaskInfo.builder().businessId(v.getBusinessId()).messageId(v.getMessageId()).bizId(v.getBizId()).build()).collect(Collectors.toList()))); } /** diff --git a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/AssembleAction.java b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/send/SendAssembleAction.java similarity index 90% rename from austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/AssembleAction.java rename to austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/send/SendAssembleAction.java index 831e11a..761d7b7 100644 --- a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/AssembleAction.java +++ b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/send/SendAssembleAction.java @@ -1,4 +1,4 @@ -package com.java3y.austin.service.api.impl.action; +package com.java3y.austin.service.api.impl.action.send; import cn.hutool.core.util.ReflectUtil; import cn.hutool.core.util.StrUtil; @@ -13,7 +13,6 @@ import com.java3y.austin.common.enums.ChannelType; import com.java3y.austin.common.enums.RespStatusEnum; import com.java3y.austin.common.vo.BasicResultVO; import com.java3y.austin.service.api.domain.MessageParam; -import com.java3y.austin.service.api.enums.BusinessCode; import com.java3y.austin.service.api.impl.domain.SendTaskModel; import com.java3y.austin.support.dao.MessageTemplateDao; import com.java3y.austin.support.domain.MessageTemplate; @@ -35,7 +34,7 @@ import java.util.*; */ @Slf4j @Service -public class AssembleAction implements BusinessProcess { +public class SendAssembleAction implements BusinessProcess { private static final String LINK_NAME = "url"; @@ -53,12 +52,8 @@ public class AssembleAction implements BusinessProcess { context.setNeedBreak(true).setResponse(BasicResultVO.fail(RespStatusEnum.TEMPLATE_NOT_FOUND)); return; } - if (BusinessCode.COMMON_SEND.getCode().equals(context.getCode())) { - List taskInfos = assembleTaskInfo(sendTaskModel, messageTemplate.get()); - sendTaskModel.setTaskInfo(taskInfos); - } else if (BusinessCode.RECALL.getCode().equals(context.getCode())) { - sendTaskModel.setMessageTemplate(messageTemplate.get()); - } + List taskInfos = assembleTaskInfo(sendTaskModel, messageTemplate.get()); + sendTaskModel.setTaskInfo(taskInfos); } catch (Exception e) { context.setNeedBreak(true).setResponse(BasicResultVO.fail(RespStatusEnum.SERVICE_ERROR)); log.error("assemble task fail! templateId:{}, e:{}", messageTemplateId, Throwables.getStackTraceAsString(e)); diff --git a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/SendMqAction.java b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/send/SendMqAction.java similarity index 62% rename from austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/SendMqAction.java rename to austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/send/SendMqAction.java index 85e80c9..6287f3e 100644 --- a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/SendMqAction.java +++ b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/send/SendMqAction.java @@ -1,12 +1,13 @@ -package com.java3y.austin.service.api.impl.action; +package com.java3y.austin.service.api.impl.action.send; import cn.hutool.core.collection.CollUtil; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.serializer.SerializerFeature; import com.google.common.base.Throwables; +import com.java3y.austin.common.domain.SimpleTaskInfo; +import com.java3y.austin.common.domain.TaskInfo; import com.java3y.austin.common.enums.RespStatusEnum; import com.java3y.austin.common.vo.BasicResultVO; -import com.java3y.austin.service.api.enums.BusinessCode; import com.java3y.austin.service.api.impl.domain.SendTaskModel; import com.java3y.austin.support.mq.SendMqService; import com.java3y.austin.support.pipeline.BusinessProcess; @@ -16,9 +17,13 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; +import java.util.List; +import java.util.stream.Collectors; + /** * @author 3y - * 将消息发送到MQ + * 1. 将消息发送到MQ + * 2. 返回拼装好的messageId给到接口调用方 */ @Slf4j @Service @@ -31,30 +36,25 @@ public class SendMqAction implements BusinessProcess { @Value("${austin.business.topic.name}") private String sendMessageTopic; - @Value("${austin.business.recall.topic.name}") - private String austinRecall; @Value("${austin.business.tagId.value}") private String tagId; @Value("${austin.mq.pipeline}") private String mqPipeline; - @Override public void process(ProcessContext context) { SendTaskModel sendTaskModel = context.getProcessModel(); + List taskInfo = sendTaskModel.getTaskInfo(); try { - if (BusinessCode.COMMON_SEND.getCode().equals(context.getCode())) { - String message = JSON.toJSONString(sendTaskModel.getTaskInfo(), new SerializerFeature[]{SerializerFeature.WriteClassName}); - sendMqService.send(sendMessageTopic, message, tagId); - } else if (BusinessCode.RECALL.getCode().equals(context.getCode())) { - String message = JSON.toJSONString(sendTaskModel.getMessageTemplate(), new SerializerFeature[]{SerializerFeature.WriteClassName}); - sendMqService.send(austinRecall, message, tagId); - } + String message = JSON.toJSONString(sendTaskModel.getTaskInfo(), new SerializerFeature[]{SerializerFeature.WriteClassName}); + sendMqService.send(sendMessageTopic, message, tagId); + + context.setResponse(BasicResultVO.success(taskInfo.stream().map(v -> SimpleTaskInfo.builder().businessId(v.getBusinessId()).messageId(v.getMessageId()).bizId(v.getBizId()).build()).collect(Collectors.toList()))); } catch (Exception e) { context.setNeedBreak(true).setResponse(BasicResultVO.fail(RespStatusEnum.SERVICE_ERROR)); log.error("send {} fail! e:{},params:{}", mqPipeline, Throwables.getStackTraceAsString(e) - , JSON.toJSONString(CollUtil.getFirst(sendTaskModel.getTaskInfo().listIterator()))); + , JSON.toJSONString(CollUtil.getFirst(taskInfo.listIterator()))); } } diff --git a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/PreParamCheckAction.java b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/send/SendPreCheckAction.java similarity index 85% rename from austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/PreParamCheckAction.java rename to austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/send/SendPreCheckAction.java index 4ec8ad1..75968d6 100644 --- a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/PreParamCheckAction.java +++ b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/send/SendPreCheckAction.java @@ -1,4 +1,4 @@ -package com.java3y.austin.service.api.impl.action; +package com.java3y.austin.service.api.impl.action.send; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.util.StrUtil; @@ -19,11 +19,11 @@ import java.util.stream.Collectors; /** * @author 3y * @date 2021/11/22 - * @description 前置参数校验 + * @description 发送消息:前置参数校验 */ @Slf4j @Service -public class PreParamCheckAction implements BusinessProcess { +public class SendPreCheckAction implements BusinessProcess { @Override public void process(ProcessContext context) { @@ -32,13 +32,13 @@ public class PreParamCheckAction implements BusinessProcess { Long messageTemplateId = sendTaskModel.getMessageTemplateId(); List messageParamList = sendTaskModel.getMessageParamList(); - // 1.没有传入 消息模板Id 或者 messageParam + // 1. 没有传入 消息模板Id 或者 messageParam if (Objects.isNull(messageTemplateId) || CollUtil.isEmpty(messageParamList)) { context.setNeedBreak(true).setResponse(BasicResultVO.fail(RespStatusEnum.CLIENT_BAD_PARAMETERS, "模板ID或参数列表为空")); return; } - // 2.过滤 receiver=null 的messageParam + // 2. 过滤 receiver=null 的messageParam List resultMessageParamList = messageParamList.stream() .filter(messageParam -> !StrUtil.isBlank(messageParam.getReceiver())) .collect(Collectors.toList()); @@ -47,7 +47,7 @@ public class PreParamCheckAction implements BusinessProcess { return; } - // 3.过滤receiver大于100的请求 + // 3. 过滤 receiver 大于100的请求 if (resultMessageParamList.stream().anyMatch(messageParam -> messageParam.getReceiver().split(StrUtil.COMMA).length > AustinConstant.BATCH_RECEIVER_SIZE)) { context.setNeedBreak(true).setResponse(BasicResultVO.fail(RespStatusEnum.TOO_MANY_RECEIVER)); return; diff --git a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/config/PipelineConfig.java b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/config/PipelineConfig.java index ad8e045..0fd184c 100644 --- a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/config/PipelineConfig.java +++ b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/config/PipelineConfig.java @@ -2,10 +2,12 @@ package com.java3y.austin.service.api.impl.config; import com.java3y.austin.service.api.enums.BusinessCode; -import com.java3y.austin.service.api.impl.action.AfterParamCheckAction; -import com.java3y.austin.service.api.impl.action.AssembleAction; -import com.java3y.austin.service.api.impl.action.PreParamCheckAction; -import com.java3y.austin.service.api.impl.action.SendMqAction; +import com.java3y.austin.service.api.impl.action.recall.RecallAssembleAction; +import com.java3y.austin.service.api.impl.action.recall.RecallMqAction; +import com.java3y.austin.service.api.impl.action.send.SendAfterCheckAction; +import com.java3y.austin.service.api.impl.action.send.SendAssembleAction; +import com.java3y.austin.service.api.impl.action.send.SendMqAction; +import com.java3y.austin.service.api.impl.action.send.SendPreCheckAction; import com.java3y.austin.support.pipeline.ProcessController; import com.java3y.austin.support.pipeline.ProcessTemplate; import org.springframework.beans.factory.annotation.Autowired; @@ -25,14 +27,20 @@ import java.util.Map; public class PipelineConfig { @Autowired - private PreParamCheckAction preParamCheckAction; + private SendPreCheckAction sendPreCheckAction; @Autowired - private AssembleAction assembleAction; + private SendAssembleAction sendAssembleAction; @Autowired - private AfterParamCheckAction afterParamCheckAction; + private SendAfterCheckAction sendAfterCheckAction; @Autowired private SendMqAction sendMqAction; + @Autowired + private RecallAssembleAction recallAssembleAction; + @Autowired + private RecallMqAction recallMqAction; + + /** * 普通发送执行流程 * 1. 前置参数校验 @@ -45,8 +53,8 @@ public class PipelineConfig { @Bean("commonSendTemplate") public ProcessTemplate commonSendTemplate() { ProcessTemplate processTemplate = new ProcessTemplate(); - processTemplate.setProcessList(Arrays.asList(preParamCheckAction, assembleAction, - afterParamCheckAction, sendMqAction)); + processTemplate.setProcessList(Arrays.asList(sendPreCheckAction, sendAssembleAction, + sendAfterCheckAction, sendMqAction)); return processTemplate; } @@ -60,7 +68,7 @@ public class PipelineConfig { @Bean("recallMessageTemplate") public ProcessTemplate recallMessageTemplate() { ProcessTemplate processTemplate = new ProcessTemplate(); - processTemplate.setProcessList(Arrays.asList(assembleAction, sendMqAction)); + processTemplate.setProcessList(Arrays.asList(recallAssembleAction, recallMqAction)); return processTemplate; } diff --git a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/domain/RecallTaskModel.java b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/domain/RecallTaskModel.java new file mode 100644 index 0000000..00c7d36 --- /dev/null +++ b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/domain/RecallTaskModel.java @@ -0,0 +1,37 @@ +package com.java3y.austin.service.api.impl.domain; + +import com.java3y.austin.common.domain.RecallTaskInfo; +import com.java3y.austin.support.pipeline.ProcessModel; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.List; + +/** + * @author 3y + * @date 2021/11/22 + * @description 发送消息任务模型 + */ +@Data +@AllArgsConstructor +@NoArgsConstructor +@Builder +public class RecallTaskModel implements ProcessModel { + + /** + * 消息模板Id + */ + private Long messageTemplateId; + + /** + * 需要撤回的消息ids + */ + private List recallMessageId; + + /** + * 撤回任务 domain + */ + private RecallTaskInfo recallTaskInfo; +} diff --git a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/domain/SendTaskModel.java b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/domain/SendTaskModel.java index 2182430..0a938a4 100644 --- a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/domain/SendTaskModel.java +++ b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/domain/SendTaskModel.java @@ -2,7 +2,6 @@ package com.java3y.austin.service.api.impl.domain; import com.java3y.austin.common.domain.TaskInfo; import com.java3y.austin.service.api.domain.MessageParam; -import com.java3y.austin.support.domain.MessageTemplate; import com.java3y.austin.support.pipeline.ProcessModel; import lombok.AllArgsConstructor; import lombok.Builder; @@ -37,9 +36,4 @@ public class SendTaskModel implements ProcessModel { */ private List taskInfo; - /** - * 撤回任务的信息 - */ - private MessageTemplate messageTemplate; - } diff --git a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/service/RecallServiceImpl.java b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/service/RecallServiceImpl.java index 058413f..a1c3a8b 100644 --- a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/service/RecallServiceImpl.java +++ b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/service/RecallServiceImpl.java @@ -4,7 +4,7 @@ import com.java3y.austin.common.enums.RespStatusEnum; import com.java3y.austin.common.vo.BasicResultVO; import com.java3y.austin.service.api.domain.SendRequest; import com.java3y.austin.service.api.domain.SendResponse; -import com.java3y.austin.service.api.impl.domain.SendTaskModel; +import com.java3y.austin.service.api.impl.domain.RecallTaskModel; import com.java3y.austin.service.api.service.RecallService; import com.java3y.austin.support.pipeline.ProcessContext; import com.java3y.austin.support.pipeline.ProcessController; @@ -26,15 +26,14 @@ public class RecallServiceImpl implements RecallService { @Override public SendResponse recall(SendRequest sendRequest) { - if(ObjectUtils.isEmpty(sendRequest)){ + if (ObjectUtils.isEmpty(sendRequest)) { return new SendResponse(RespStatusEnum.CLIENT_BAD_PARAMETERS.getCode(), RespStatusEnum.CLIENT_BAD_PARAMETERS.getMsg(), null); } - SendTaskModel sendTaskModel = SendTaskModel.builder() - .messageTemplateId(sendRequest.getMessageTemplateId()) - .build(); + RecallTaskModel recallTaskModel = RecallTaskModel.builder().messageTemplateId(sendRequest.getMessageTemplateId()) + .recallMessageId(sendRequest.getRecallMessageIds()).build(); ProcessContext context = ProcessContext.builder() .code(sendRequest.getCode()) - .processModel(sendTaskModel) + .processModel(recallTaskModel) .needBreak(false) .response(BasicResultVO.success()).build(); ProcessContext process = processController.process(context); diff --git a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/service/SendServiceImpl.java b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/service/SendServiceImpl.java index f5b1325..f217baa 100644 --- a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/service/SendServiceImpl.java +++ b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/service/SendServiceImpl.java @@ -32,7 +32,7 @@ public class SendServiceImpl implements SendService { @Override @OperationLog(bizType = "SendService#send", bizId = "#sendRequest.messageTemplateId", msg = "#sendRequest") public SendResponse send(SendRequest sendRequest) { - if(ObjectUtils.isEmpty(sendRequest)){ + if (ObjectUtils.isEmpty(sendRequest)) { return new SendResponse(RespStatusEnum.CLIENT_BAD_PARAMETERS.getCode(), RespStatusEnum.CLIENT_BAD_PARAMETERS.getMsg(), null); } @@ -55,6 +55,10 @@ public class SendServiceImpl implements SendService { @Override @OperationLog(bizType = "SendService#batchSend", bizId = "#batchSendRequest.messageTemplateId", msg = "#batchSendRequest") public SendResponse batchSend(BatchSendRequest batchSendRequest) { + if (ObjectUtils.isEmpty(batchSendRequest)) { + return new SendResponse(RespStatusEnum.CLIENT_BAD_PARAMETERS.getCode(), RespStatusEnum.CLIENT_BAD_PARAMETERS.getMsg(), null); + } + SendTaskModel sendTaskModel = SendTaskModel.builder() .messageTemplateId(batchSendRequest.getMessageTemplateId()) .messageParamList(batchSendRequest.getMessageParamList()) @@ -68,7 +72,7 @@ public class SendServiceImpl implements SendService { ProcessContext process = processController.process(context); - return new SendResponse(process.getResponse().getStatus(), process.getResponse().getMsg(), null); + return new SendResponse(process.getResponse().getStatus(), process.getResponse().getMsg(), (List) process.getResponse().getData()); } diff --git a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/service/TraceServiceImpl.java b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/service/TraceServiceImpl.java index 246d891..cd1a48b 100644 --- a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/service/TraceServiceImpl.java +++ b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/service/TraceServiceImpl.java @@ -6,7 +6,6 @@ import com.alibaba.fastjson.JSON; import com.java3y.austin.common.constant.AustinConstant; import com.java3y.austin.common.domain.SimpleAnchorInfo; import com.java3y.austin.common.enums.RespStatusEnum; -import com.java3y.austin.service.api.domain.SendResponse; import com.java3y.austin.service.api.domain.TraceResponse; import com.java3y.austin.service.api.service.TraceService; import com.java3y.austin.support.utils.RedisUtils; diff --git a/austin-service-api/src/main/java/com/java3y/austin/service/api/domain/SendRequest.java b/austin-service-api/src/main/java/com/java3y/austin/service/api/domain/SendRequest.java index 4484a83..8461e44 100644 --- a/austin-service-api/src/main/java/com/java3y/austin/service/api/domain/SendRequest.java +++ b/austin-service-api/src/main/java/com/java3y/austin/service/api/domain/SendRequest.java @@ -7,6 +7,8 @@ import lombok.Data; import lombok.NoArgsConstructor; import lombok.experimental.Accessors; +import java.util.List; + /** * 发送/撤回接口的参数 * @@ -21,6 +23,8 @@ public class SendRequest { /** * 执行业务类型 + * + * @see com.java3y.austin.service.api.enums.BusinessCode * send:发送消息 * recall:撤回消息 */ @@ -39,5 +43,10 @@ public class SendRequest { */ private MessageParam messageParam; + /** + * 需要撤回的消息messageIds (可根据发送接口返回的消息messageId进行撤回) + * 【可选】 + */ + private List recallMessageIds; } diff --git a/austin-service-api/src/main/java/com/java3y/austin/service/api/service/RecallService.java b/austin-service-api/src/main/java/com/java3y/austin/service/api/service/RecallService.java index 7795f4f..d379854 100644 --- a/austin-service-api/src/main/java/com/java3y/austin/service/api/service/RecallService.java +++ b/austin-service-api/src/main/java/com/java3y/austin/service/api/service/RecallService.java @@ -12,7 +12,9 @@ public interface RecallService { /** - * 根据模板ID撤回消息 + * 根据 模板ID 或消息id 撤回消息 + * 如果只传入 messageTemplateId,则会撤回整个模板下发的消息 + * 如果还传入 recallMessageId,则优先撤回该 ids 的消息 * * @param sendRequest * @return diff --git a/austin-service-api/src/main/java/com/java3y/austin/service/api/service/SendService.java b/austin-service-api/src/main/java/com/java3y/austin/service/api/service/SendService.java index 7de2c69..dcba4fb 100644 --- a/austin-service-api/src/main/java/com/java3y/austin/service/api/service/SendService.java +++ b/austin-service-api/src/main/java/com/java3y/austin/service/api/service/SendService.java @@ -15,8 +15,8 @@ public interface SendService { /** * 单文案发送接口 * - * @param sendRequest - * @return + * @param sendRequest eg: {"code":"send","messageParam":{"bizId":null,"extra":null,"receiver":"123@qq.com","variables":null},"messageTemplateId":17,"recallMessageId":null} + * @return SendResponse eg: {"code":"0","data":[{"bizId":"ecZim2-FzdejNSY-sqgCM","businessId":2000001720230815,"messageId":"ecZim2-FzdejNSY-sqgCM"}],"msg":"操作成功"} */ SendResponse send(SendRequest sendRequest); diff --git a/austin-service-api/src/main/java/com/java3y/austin/service/api/service/TraceService.java b/austin-service-api/src/main/java/com/java3y/austin/service/api/service/TraceService.java index 5e875bd..2173b51 100644 --- a/austin-service-api/src/main/java/com/java3y/austin/service/api/service/TraceService.java +++ b/austin-service-api/src/main/java/com/java3y/austin/service/api/service/TraceService.java @@ -4,6 +4,7 @@ import com.java3y.austin.service.api.domain.TraceResponse; /** * 链路查询接口 + * * @Author: sky * @Date: 2023/7/13 13:35 * @Description: TraceService @@ -13,6 +14,7 @@ public interface TraceService { /** * 基于消息 ID 查询 链路结果 + * * @param messageId * @return */ diff --git a/austin-stream/src/main/java/com/java3y/austin/stream/sink/AustinSink.java b/austin-stream/src/main/java/com/java3y/austin/stream/sink/AustinSink.java index 23eba17..d8427cf 100644 --- a/austin-stream/src/main/java/com/java3y/austin/stream/sink/AustinSink.java +++ b/austin-stream/src/main/java/com/java3y/austin/stream/sink/AustinSink.java @@ -28,7 +28,6 @@ public class AustinSink implements SinkFunction { @Override public void invoke(AnchorInfo anchorInfo, Context context) throws Exception { realTimeData(anchorInfo); - offlineDate(anchorInfo); } @@ -43,14 +42,16 @@ public class AustinSink implements SinkFunction { try { LettuceRedisUtils.pipeline(redisAsyncCommands -> { List> redisFutures = new ArrayList<>(); + /** - * 1.构建messageId维度的链路信息 数据结构list:{key,list} + * 0.构建messageId维度的链路信息 数据结构list:{key,list} * key:Austin:MessageId:{messageId},listValue:[{timestamp,state,businessId},{timestamp,state,businessId}] */ String redisMessageKey = StrUtil.join(StrUtil.COLON, AustinConstant.CACHE_KEY_PREFIX, AustinConstant.MESSAGE_ID, info.getMessageId()); SimpleAnchorInfo messageAnchorInfo = SimpleAnchorInfo.builder().businessId(info.getBusinessId()).state(info.getState()).timestamp(info.getLogTimestamp()).build(); redisFutures.add(redisAsyncCommands.lpush(redisMessageKey.getBytes(), JSON.toJSONString(messageAnchorInfo).getBytes())); redisFutures.add(redisAsyncCommands.expire(redisMessageKey.getBytes(), Duration.ofDays(3).toMillis() / 1000)); + /** * 1.构建userId维度的链路信息 数据结构list:{key,list} * key:userId,listValue:[{timestamp,state,businessId},{timestamp,state,businessId}] @@ -77,15 +78,4 @@ public class AustinSink implements SinkFunction { log.error("AustinSink#invoke error: {}", Throwables.getStackTraceAsString(e)); } } - - /** - * 离线数据存入hive - * - * @param anchorInfo - */ - private void offlineDate(AnchorInfo anchorInfo) { - - } - - } diff --git a/austin-support/src/main/java/com/java3y/austin/support/config/SupportThreadPoolConfig.java b/austin-support/src/main/java/com/java3y/austin/support/config/SupportThreadPoolConfig.java index 85c00fb..85b2129 100644 --- a/austin-support/src/main/java/com/java3y/austin/support/config/SupportThreadPoolConfig.java +++ b/austin-support/src/main/java/com/java3y/austin/support/config/SupportThreadPoolConfig.java @@ -4,6 +4,7 @@ import cn.hutool.core.thread.ExecutorBuilder; import com.java3y.austin.common.constant.ThreadPoolConstant; import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -22,7 +23,7 @@ public class SupportThreadPoolConfig { return ExecutorBuilder.create() .setCorePoolSize(ThreadPoolConstant.SINGLE_CORE_POOL_SIZE) .setMaxPoolSize(ThreadPoolConstant.SINGLE_MAX_POOL_SIZE) - .setWorkQueue(ThreadPoolConstant.BIG_BLOCKING_QUEUE) + .setWorkQueue(new LinkedBlockingQueue(ThreadPoolConstant.BIG_QUEUE_SIZE)) .setHandler(new ThreadPoolExecutor.CallerRunsPolicy()) .setAllowCoreThreadTimeOut(true) .setKeepAliveTime(ThreadPoolConstant.SMALL_KEEP_LIVE_TIME, TimeUnit.SECONDS) diff --git a/austin-support/src/main/java/com/java3y/austin/support/mq/eventbus/EventBusListener.java b/austin-support/src/main/java/com/java3y/austin/support/mq/eventbus/EventBusListener.java index 172ba4f..27305d3 100644 --- a/austin-support/src/main/java/com/java3y/austin/support/mq/eventbus/EventBusListener.java +++ b/austin-support/src/main/java/com/java3y/austin/support/mq/eventbus/EventBusListener.java @@ -1,8 +1,8 @@ package com.java3y.austin.support.mq.eventbus; +import com.java3y.austin.common.domain.RecallTaskInfo; import com.java3y.austin.common.domain.TaskInfo; -import com.java3y.austin.support.domain.MessageTemplate; import java.util.List; @@ -23,7 +23,7 @@ public interface EventBusListener { /** * 撤回消息 * - * @param messageTemplate + * @param recallTaskInfo */ - void recall(MessageTemplate messageTemplate); + void recall(RecallTaskInfo recallTaskInfo); } diff --git a/austin-support/src/main/java/com/java3y/austin/support/mq/eventbus/EventBusSendMqServiceImpl.java b/austin-support/src/main/java/com/java3y/austin/support/mq/eventbus/EventBusSendMqServiceImpl.java index aa7caad..04ffbf2 100644 --- a/austin-support/src/main/java/com/java3y/austin/support/mq/eventbus/EventBusSendMqServiceImpl.java +++ b/austin-support/src/main/java/com/java3y/austin/support/mq/eventbus/EventBusSendMqServiceImpl.java @@ -2,6 +2,7 @@ package com.java3y.austin.support.mq.eventbus; import com.alibaba.fastjson.JSON; import com.google.common.eventbus.EventBus; +import com.java3y.austin.common.domain.RecallTaskInfo; import com.java3y.austin.common.domain.TaskInfo; import com.java3y.austin.support.constans.MessageQueuePipeline; import com.java3y.austin.support.domain.MessageTemplate; @@ -43,7 +44,7 @@ public class EventBusSendMqServiceImpl implements SendMqService { if (topic.equals(sendTopic)) { eventBus.post(JSON.parseArray(jsonValue, TaskInfo.class)); } else if (topic.equals(recallTopic)) { - eventBus.post(JSON.parseObject(jsonValue, MessageTemplate.class)); + eventBus.post(JSON.parseObject(jsonValue, RecallTaskInfo.class)); } } diff --git a/austin-support/src/main/java/com/java3y/austin/support/utils/AccessTokenUtils.java b/austin-support/src/main/java/com/java3y/austin/support/utils/AccessTokenUtils.java index de08eb3..2a0417a 100644 --- a/austin-support/src/main/java/com/java3y/austin/support/utils/AccessTokenUtils.java +++ b/austin-support/src/main/java/com/java3y/austin/support/utils/AccessTokenUtils.java @@ -11,6 +11,7 @@ import com.dingtalk.api.request.OapiGettokenRequest; import com.dingtalk.api.response.OapiGettokenResponse; import com.google.common.base.Throwables; import com.java3y.austin.common.constant.CommonConstant; +import com.java3y.austin.common.constant.SendChanelUrlConstant; import com.java3y.austin.common.dto.account.DingDingWorkNoticeAccount; import com.java3y.austin.common.dto.account.GeTuiAccount; import com.java3y.austin.support.dto.GeTuiTokenResultDTO; @@ -19,15 +20,12 @@ import lombok.extern.slf4j.Slf4j; /** * 获取第三发token工具类 + * + * @author wuhui */ @Slf4j public class AccessTokenUtils { - /** - * 钉钉获取token地址 - */ - private static final String DING_DING_TOKEN_URL = "https://oapi.dingtalk.com/gettoken"; - /** * 获取钉钉 access_token * @@ -37,7 +35,7 @@ public class AccessTokenUtils { public static String getDingDingAccessToken(DingDingWorkNoticeAccount account) { String accessToken = ""; try { - DingTalkClient client = new DefaultDingTalkClient(DING_DING_TOKEN_URL); + DingTalkClient client = new DefaultDingTalkClient(SendChanelUrlConstant.DING_DING_TOKEN_URL); OapiGettokenRequest req = new OapiGettokenRequest(); req.setAppkey(account.getAppKey()); req.setAppsecret(account.getAppSecret()); @@ -59,7 +57,7 @@ public class AccessTokenUtils { public static String getGeTuiAccessToken(GeTuiAccount account) { String accessToken = ""; try { - String url = "https://restapi.getui.com/v2/" + account.getAppId() + "/auth"; + String url = SendChanelUrlConstant.GE_TUI_BASE_URL + account.getAppId() + SendChanelUrlConstant.GE_TUI_AUTH; String time = String.valueOf(System.currentTimeMillis()); String digest = SecureUtil.sha256().digestHex(account.getAppKey() + time + account.getMasterSecret()); QueryTokenParamDTO param = QueryTokenParamDTO.builder() diff --git a/austin-support/src/main/java/com/java3y/austin/support/utils/AccountUtils.java b/austin-support/src/main/java/com/java3y/austin/support/utils/AccountUtils.java index 7640368..f2a983a 100644 --- a/austin-support/src/main/java/com/java3y/austin/support/utils/AccountUtils.java +++ b/austin-support/src/main/java/com/java3y/austin/support/utils/AccountUtils.java @@ -1,12 +1,13 @@ package com.java3y.austin.support.utils; + import cn.binarywang.wx.miniapp.api.WxMaService; import cn.binarywang.wx.miniapp.api.impl.WxMaServiceImpl; import cn.binarywang.wx.miniapp.config.impl.WxMaRedisBetterConfigImpl; import com.alibaba.fastjson.JSON; import com.google.common.base.Throwables; +import com.java3y.austin.common.constant.AccessTokenPrefixConstant; import com.java3y.austin.common.constant.CommonConstant; -import com.java3y.austin.common.constant.SendAccountConstant; import com.java3y.austin.common.dto.account.WeChatMiniProgramAccount; import com.java3y.austin.common.dto.account.WeChatOfficialAccount; import com.java3y.austin.common.dto.account.sms.SmsAccount; @@ -15,6 +16,7 @@ import com.java3y.austin.support.dao.ChannelAccountDao; import com.java3y.austin.support.domain.ChannelAccount; import lombok.extern.slf4j.Slf4j; import me.chanjar.weixin.common.redis.RedisTemplateWxRedisOps; + import me.chanjar.weixin.mp.api.WxMpService; import me.chanjar.weixin.mp.api.impl.WxMpServiceImpl; import me.chanjar.weixin.mp.config.impl.WxMpRedisConfigImpl; @@ -119,7 +121,7 @@ public class AccountUtils { */ public WxMpService initOfficialAccountService(WeChatOfficialAccount officialAccount) { WxMpService wxMpService = new WxMpServiceImpl(); - WxMpRedisConfigImpl config = new WxMpRedisConfigImpl(redisTemplateWxRedisOps(), SendAccountConstant.OFFICIAL_ACCOUNT_ACCESS_TOKEN_PREFIX); + WxMpRedisConfigImpl config = new WxMpRedisConfigImpl(redisTemplateWxRedisOps(), AccessTokenPrefixConstant.OFFICIAL_ACCOUNT_ACCESS_TOKEN_PREFIX); config.setAppId(officialAccount.getAppId()); config.setSecret(officialAccount.getSecret()); config.setToken(officialAccount.getToken()); @@ -136,7 +138,7 @@ public class AccountUtils { */ private WxMaService initMiniProgramService(WeChatMiniProgramAccount miniProgramAccount) { WxMaService wxMaService = new WxMaServiceImpl(); - WxMaRedisBetterConfigImpl config = new WxMaRedisBetterConfigImpl(redisTemplateWxRedisOps(), SendAccountConstant.MINI_PROGRAM_TOKEN_PREFIX); + WxMaRedisBetterConfigImpl config = new WxMaRedisBetterConfigImpl(redisTemplateWxRedisOps(), AccessTokenPrefixConstant.MINI_PROGRAM_TOKEN_PREFIX); config.setAppid(miniProgramAccount.getAppId()); config.setSecret(miniProgramAccount.getAppSecret()); config.useStableAccessToken(true); diff --git a/austin-support/src/main/java/com/java3y/austin/support/utils/AustinFileUtils.java b/austin-support/src/main/java/com/java3y/austin/support/utils/AustinFileUtils.java index 3c6873d..e7be734 100644 --- a/austin-support/src/main/java/com/java3y/austin/support/utils/AustinFileUtils.java +++ b/austin-support/src/main/java/com/java3y/austin/support/utils/AustinFileUtils.java @@ -43,7 +43,7 @@ public class AustinFileUtils { /** * 读取 远程链接集合 返回有效的File对象集合 * - * @param path 文件路径 + * @param path 文件路径 * @param remoteUrls cdn/oss文件访问链接集合 * @return */ diff --git a/austin-support/src/main/java/com/java3y/austin/support/utils/TaskInfoUtils.java b/austin-support/src/main/java/com/java3y/austin/support/utils/TaskInfoUtils.java index 878b24a..a70f5a1 100644 --- a/austin-support/src/main/java/com/java3y/austin/support/utils/TaskInfoUtils.java +++ b/austin-support/src/main/java/com/java3y/austin/support/utils/TaskInfoUtils.java @@ -19,11 +19,13 @@ public class TaskInfoUtils { /** * 生成任务唯一Id + * * @return */ public static String generateMessageId() { return IdUtil.nanoId(); } + /** * 生成BusinessId * 模板类型+模板ID+当天日期 diff --git a/austin-web/src/main/java/com/java3y/austin/AustinApplication.java b/austin-web/src/main/java/com/java3y/austin/AustinApplication.java index fc7e5ed..7676bb6 100644 --- a/austin-web/src/main/java/com/java3y/austin/AustinApplication.java +++ b/austin-web/src/main/java/com/java3y/austin/AustinApplication.java @@ -1,6 +1,13 @@ package com.java3y.austin; +import com.java3y.austin.common.constant.AustinConstant; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; +import org.springframework.boot.ansi.AnsiColor; +import org.springframework.boot.ansi.AnsiOutput; +import org.springframework.boot.ansi.AnsiStyle; import org.springframework.boot.autoconfigure.SpringBootApplication; @@ -8,9 +15,13 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; * @author 3y */ @SpringBootApplication -public class AustinApplication { - public static void main(String[] args) { +@Slf4j +public class AustinApplication implements CommandLineRunner { + + @Value("${server.port}") + private String serverPort; + public static void main(String[] args) { /** * 如果你需要启动Apollo动态配置 * 1、启动apollo @@ -19,5 +30,13 @@ public class AustinApplication { */ System.setProperty("apollo.config-service", "http://austin-apollo-config:8080"); SpringApplication.run(AustinApplication.class, args); + + } + + @Override + public void run(String... args) throws Exception { + log.info(AnsiOutput.toString(AustinConstant.PROJECT_BANNER, "\n", AnsiColor.GREEN, AustinConstant.PROJECT_NAME, AnsiColor.DEFAULT, AnsiStyle.FAINT)); + log.info("Austin start succeeded, Index >> http://127.0.0.1:{}/", serverPort); + log.info("Austin start succeeded, Swagger Url >> http://127.0.0.1:{}/swagger-ui/index.html", serverPort); } } diff --git a/austin-web/src/main/java/com/java3y/austin/web/controller/MiniProgramController.java b/austin-web/src/main/java/com/java3y/austin/web/controller/MiniProgramController.java index 6ae5f2b..c548b09 100644 --- a/austin-web/src/main/java/com/java3y/austin/web/controller/MiniProgramController.java +++ b/austin-web/src/main/java/com/java3y/austin/web/controller/MiniProgramController.java @@ -4,6 +4,7 @@ package com.java3y.austin.web.controller; import cn.binarywang.wx.miniapp.api.WxMaService; import cn.hutool.http.HttpUtil; import com.google.common.base.Throwables; +import com.java3y.austin.common.constant.SendChanelUrlConstant; import com.java3y.austin.common.enums.RespStatusEnum; import com.java3y.austin.support.utils.AccountUtils; import com.java3y.austin.web.annotation.AustinAspect; @@ -94,7 +95,8 @@ public class MiniProgramController { @GetMapping("/sync/openid") @ApiOperation("登录凭证校验") public String syncOpenId(String code, String appId, String secret) { - String url = "https://api.weixin.qq.com/sns/jscode2session?appid=" + appId + "&secret=" + secret + "&js_code=" + code + "&grant_type=authorization_code"; + String url = SendChanelUrlConstant.WE_CHAT_MINI_PROGRAM_OPENID_SYNC + .replace("", appId).replace("", code).replace("", secret); return HttpUtil.get(url); } diff --git a/austin-web/src/main/java/com/java3y/austin/web/service/impl/DataServiceImpl.java b/austin-web/src/main/java/com/java3y/austin/web/service/impl/DataServiceImpl.java index 4f99809..0f981b7 100644 --- a/austin-web/src/main/java/com/java3y/austin/web/service/impl/DataServiceImpl.java +++ b/austin-web/src/main/java/com/java3y/austin/web/service/impl/DataServiceImpl.java @@ -124,7 +124,7 @@ public class DataServiceImpl implements DataService { return businessId; } - private UserTimeLineVo buildUserTimeLineVo(List sortAnchorList){ + private UserTimeLineVo buildUserTimeLineVo(List sortAnchorList) { // 1. 对相同的businessId进行分类 {"businessId":[{businessId,state,timeStamp},{businessId,state,timeStamp}]} Map> map = MapUtil.newHashMap(); for (SimpleAnchorInfo simpleAnchorInfo : sortAnchorList) { diff --git a/austin-web/src/main/java/com/java3y/austin/web/service/impl/MaterialServiceImpl.java b/austin-web/src/main/java/com/java3y/austin/web/service/impl/MaterialServiceImpl.java index ab6cb3d..d24faeb 100644 --- a/austin-web/src/main/java/com/java3y/austin/web/service/impl/MaterialServiceImpl.java +++ b/austin-web/src/main/java/com/java3y/austin/web/service/impl/MaterialServiceImpl.java @@ -9,8 +9,9 @@ import com.dingtalk.api.DingTalkClient; import com.dingtalk.api.request.OapiMediaUploadRequest; import com.dingtalk.api.response.OapiMediaUploadResponse; import com.google.common.base.Throwables; +import com.java3y.austin.common.constant.AccessTokenPrefixConstant; import com.java3y.austin.common.constant.CommonConstant; -import com.java3y.austin.common.constant.SendAccountConstant; +import com.java3y.austin.common.constant.SendChanelUrlConstant; import com.java3y.austin.common.dto.account.EnterpriseWeChatRobotAccount; import com.java3y.austin.common.enums.EnumUtil; import com.java3y.austin.common.enums.FileType; @@ -44,15 +45,13 @@ public class MaterialServiceImpl implements MaterialService { @Autowired private AccountUtils accountUtils; - private static final String DING_DING_URL = "https://oapi.dingtalk.com/media/upload"; - private static final String ENTERPRISE_WE_CHAT_ROBOT_URL = "https://qyapi.weixin.qq.com/cgi-bin/webhook/upload_media?key=&type="; @Override public BasicResultVO dingDingMaterialUpload(MultipartFile file, String sendAccount, String fileType) { OapiMediaUploadResponse rsp; try { - String accessToken = redisTemplate.opsForValue().get(SendAccountConstant.DING_DING_ACCESS_TOKEN_PREFIX + sendAccount); - DingTalkClient client = new DefaultDingTalkClient(DING_DING_URL); + String accessToken = redisTemplate.opsForValue().get(AccessTokenPrefixConstant.DING_DING_ACCESS_TOKEN_PREFIX + sendAccount); + DingTalkClient client = new DefaultDingTalkClient(SendChanelUrlConstant.DING_DING_UPLOAD_URL); OapiMediaUploadRequest req = new OapiMediaUploadRequest(); FileItem item = new FileItem(new StringBuilder().append(IdUtil.fastSimpleUUID()).append(file.getOriginalFilename()).toString(), file.getInputStream()); @@ -74,7 +73,9 @@ public class MaterialServiceImpl implements MaterialService { try { EnterpriseWeChatRobotAccount weChatRobotAccount = accountUtils.getAccountById(Integer.valueOf(sendAccount), EnterpriseWeChatRobotAccount.class); String key = weChatRobotAccount.getWebhook().substring(weChatRobotAccount.getWebhook().indexOf(CommonConstant.EQUAL_STRING) + 1); - String url = ENTERPRISE_WE_CHAT_ROBOT_URL.replace("", key).replace("", "file"); + + // 企业微信机器人 默认只上传"file"文件类型 + String url = SendChanelUrlConstant.ENTERPRISE_WE_CHAT_ROBOT_URL.replace("", key).replace("", "file"); String response = HttpRequest.post(url) .form(IdUtil.fastSimpleUUID(), SpringFileUtils.getFile(multipartFile)) .execute().body(); diff --git a/austin-web/src/main/resources/logback.xml b/austin-web/src/main/resources/logback.xml index c98ec94..785d5cd 100644 --- a/austin-web/src/main/resources/logback.xml +++ b/austin-web/src/main/resources/logback.xml @@ -4,7 +4,7 @@ austin - + @@ -103,12 +103,23 @@ - - - - - - - + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/doc/docker/mysql/mysql.cnf b/doc/docker/mysql/mysql.cnf new file mode 100644 index 0000000..6aeb365 --- /dev/null +++ b/doc/docker/mysql/mysql.cnf @@ -0,0 +1,5 @@ +[mysqld] +max_connections=10000 +group_concat_max_len=10240 +wait_timeout=300 +interactive_timeout=500 diff --git a/docker-compose.yml b/docker-compose.yml index 309cb55..389d705 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -13,6 +13,8 @@ services: command: --init-file /docker-entrypoint-initdb.d/init.sql volumes: - ./doc/sql/austin.sql:/docker-entrypoint-initdb.d/init.sql + - ./doc/docker/mysql:/var/lib/mysql + - ./doc/docker/mysql/mysql.cnf:/etc/mysql/my.cnf ports: - "3306:3306" networks: