From bb31cbb09f605a0782acfc4fc000a42e76b9d00f Mon Sep 17 00:00:00 2001 From: 3y Date: Thu, 9 Jun 2022 21:45:00 +0800 Subject: [PATCH] =?UTF-8?q?=E9=92=89=E9=92=89=E5=B7=A5=E4=BD=9C=E6=B6=88?= =?UTF-8?q?=E6=81=AF=E6=94=AF=E6=8C=81=E6=92=A4=E5=9B=9E=EF=BC=88=E6=92=A4?= =?UTF-8?q?=E5=9B=9E=E6=B6=88=E6=81=AF=E6=9E=B6=E6=9E=84=E6=A8=A1=E6=9D=BF?= =?UTF-8?q?=E7=BC=96=E5=86=99=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../austin/handler/handler/Handler.java | 11 ++++ .../handler/impl/DingDingRobotHandler.java | 9 ++++ .../impl/DingDingWorkNoticeHandler.java | 51 ++++++++++++++++--- .../handler/handler/impl/EmailHandler.java | 4 ++ .../handler/impl/EnterpriseWeChatHandler.java | 5 ++ .../impl/MiniProgramAccountHandler.java | 4 ++ .../handler/impl/OfficialAccountHandler.java | 5 ++ .../handler/handler/impl/PushHandler.java | 4 ++ .../handler/handler/impl/SmsHandler.java | 4 ++ .../austin/handler/receiver/Receiver.java | 26 +++++++++- .../api/impl/action/AssembleAction.java | 11 ++-- .../service/api/impl/action/SendMqAction.java | 16 ++++-- .../api/impl/config/PipelineConfig.java | 14 +++++ .../api/impl/domain/SendTaskModel.java | 5 ++ .../api/impl/service/RecallServiceImpl.java | 37 ++++++++++++++ .../service/api/domain/SendRequest.java | 8 ++- .../service/api/service/RecallService.java | 22 ++++++++ .../support/domain/MessageTemplate.java | 4 +- .../controller/MessageTemplateController.java | 20 ++++++++ .../src/main/resources/application.properties | 2 + 20 files changed, 244 insertions(+), 18 deletions(-) create mode 100644 austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/service/RecallServiceImpl.java create mode 100644 austin-service-api/src/main/java/com/java3y/austin/service/api/service/RecallService.java diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/handler/Handler.java b/austin-handler/src/main/java/com/java3y/austin/handler/handler/Handler.java index c01b1f0..d9c3165 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/handler/Handler.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/handler/Handler.java @@ -1,6 +1,7 @@ package com.java3y.austin.handler.handler; import com.java3y.austin.common.domain.TaskInfo; +import com.java3y.austin.support.domain.MessageTemplate; /** * @author 3y @@ -10,8 +11,18 @@ public interface Handler { /** * 处理器 + * * @param taskInfo */ void doHandler(TaskInfo taskInfo); + /** + * 撤回消息 + * + * @param messageTemplate + * @return + */ + void recall(MessageTemplate messageTemplate); + + } diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/DingDingRobotHandler.java b/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/DingDingRobotHandler.java index 2c92c98..e378215 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/DingDingRobotHandler.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/DingDingRobotHandler.java @@ -16,6 +16,7 @@ import com.java3y.austin.handler.domain.dingding.DingDingRobotParam; import com.java3y.austin.handler.domain.dingding.DingDingRobotResult; import com.java3y.austin.handler.handler.BaseHandler; import com.java3y.austin.handler.handler.Handler; +import com.java3y.austin.support.domain.MessageTemplate; import com.java3y.austin.support.utils.AccountUtils; import lombok.extern.slf4j.Slf4j; import org.apache.commons.codec.binary.Base64; @@ -62,6 +63,8 @@ public class DingDingRobotHandler extends BaseHandler implements Handler { return false; } + + private DingDingRobotParam assembleParam(TaskInfo taskInfo) { // 接收者相关 @@ -132,5 +135,11 @@ public class DingDingRobotHandler extends BaseHandler implements Handler { } return sign; } + + + @Override + public void recall(MessageTemplate messageTemplate) { + + } } diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/DingDingWorkNoticeHandler.java b/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/DingDingWorkNoticeHandler.java index 944384c..7b47c2a 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/DingDingWorkNoticeHandler.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/DingDingWorkNoticeHandler.java @@ -5,21 +5,26 @@ import cn.hutool.core.date.DateUtil; import cn.hutool.core.util.StrUtil; import com.alibaba.fastjson.JSON; import com.dingtalk.api.DefaultDingTalkClient; +import com.dingtalk.api.DingTalkClient; import com.dingtalk.api.request.OapiMessageCorpconversationAsyncsendV2Request; +import com.dingtalk.api.request.OapiMessageCorpconversationRecallRequest; import com.dingtalk.api.response.OapiMessageCorpconversationAsyncsendV2Response; +import com.dingtalk.api.response.OapiMessageCorpconversationRecallResponse; import com.google.common.base.Throwables; import com.java3y.austin.common.constant.AustinConstant; import com.java3y.austin.common.constant.SendAccountConstant; +import com.java3y.austin.common.domain.LogParam; import com.java3y.austin.common.domain.TaskInfo; import com.java3y.austin.common.dto.account.DingDingWorkNoticeAccount; -import com.java3y.austin.common.dto.model.DingDingRobotContentModel; import com.java3y.austin.common.dto.model.DingDingWorkContentModel; import com.java3y.austin.common.enums.ChannelType; import com.java3y.austin.common.enums.SendMessageType; -import com.java3y.austin.handler.domain.dingding.DingDingRobotParam; import com.java3y.austin.handler.handler.BaseHandler; import com.java3y.austin.handler.handler.Handler; +import com.java3y.austin.support.config.SupportThreadPoolConfig; +import com.java3y.austin.support.domain.MessageTemplate; import com.java3y.austin.support.utils.AccountUtils; +import com.java3y.austin.support.utils.LogUtils; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; @@ -27,7 +32,6 @@ import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Service; import java.util.Date; -import java.util.List; import java.util.concurrent.TimeUnit; /** @@ -41,17 +45,22 @@ import java.util.concurrent.TimeUnit; @Service public class DingDingWorkNoticeHandler extends BaseHandler implements Handler { + @Autowired private AccountUtils accountUtils; @Autowired private StringRedisTemplate redisTemplate; + @Autowired + private LogUtils logUtils; public DingDingWorkNoticeHandler() { channelCode = ChannelType.DING_DING_WORK_NOTICE.getCode(); } - private static final String URL = "https://oapi.dingtalk.com/topapi/message/corpconversation/asyncsend_v2"; + private static final String SEND_URL = "https://oapi.dingtalk.com/topapi/message/corpconversation/asyncsend_v2"; + private static final String RECALL_URL = "https://oapi.dingtalk.com/topapi/message/corpconversation/recall"; private static final String DING_DING_RECALL_KEY_PREFIX = "RECALL_"; + private static final String RECALL_BIZ_TYPE = "DingDingWorkNoticeHandler#recall"; @Override public boolean handler(TaskInfo taskInfo) { @@ -59,13 +68,15 @@ public class DingDingWorkNoticeHandler extends BaseHandler implements Handler { DingDingWorkNoticeAccount account = accountUtils.getAccount(taskInfo.getSendAccount(), SendAccountConstant.DING_DING_WORK_NOTICE_ACCOUNT_KEY, SendAccountConstant.DING_DING_WORK_NOTICE_PREFIX, DingDingWorkNoticeAccount.class); OapiMessageCorpconversationAsyncsendV2Request request = assembleParam(account, taskInfo); String accessToken = redisTemplate.opsForValue().get(SendAccountConstant.DING_DING_ACCESS_TOKEN_PREFIX + taskInfo.getSendAccount()); - OapiMessageCorpconversationAsyncsendV2Response response = new DefaultDingTalkClient(URL).execute(request, accessToken); + OapiMessageCorpconversationAsyncsendV2Response response = new DefaultDingTalkClient(SEND_URL).execute(request, accessToken); + + // 发送成功后记录TaskId,用于消息撤回(支持当天的) if (response.getErrcode() == 0) { - // 用于消息撤回(支持当天的) redisTemplate.opsForList().leftPush(DING_DING_RECALL_KEY_PREFIX + taskInfo.getMessageTemplateId(), String.valueOf(response.getTaskId())); redisTemplate.expire(DING_DING_RECALL_KEY_PREFIX + taskInfo.getMessageTemplateId(), (DateUtil.endOfDay(new Date()).getTime() - DateUtil.current()) / 1000, TimeUnit.SECONDS); return true; } + // 常见的错误 应当 关联至 AnchorState,由austin后台统一透出失败原因 log.error("DingDingWorkNoticeHandler#handler fail!result:{},params:{}", JSON.toJSONString(response), JSON.toJSONString(taskInfo)); } catch (Exception e) { @@ -156,5 +167,33 @@ public class DingDingWorkNoticeHandler extends BaseHandler implements Handler { } return req; } + + + /** + * 在下发的时候存储了messageTemplate -> taskIdList + * 只要还存在taskIdList,则将其去除 + * + * @param messageTemplate + */ + @Override + public void recall(MessageTemplate messageTemplate) { + SupportThreadPoolConfig.getPendingSingleThreadPool().execute(() -> { + try { + DingDingWorkNoticeAccount account = accountUtils.getAccount(messageTemplate.getSendAccount(), SendAccountConstant.DING_DING_WORK_NOTICE_ACCOUNT_KEY, SendAccountConstant.DING_DING_WORK_NOTICE_PREFIX, DingDingWorkNoticeAccount.class); + String accessToken = redisTemplate.opsForValue().get(SendAccountConstant.DING_DING_ACCESS_TOKEN_PREFIX + messageTemplate.getSendAccount()); + while (redisTemplate.opsForList().size(DING_DING_RECALL_KEY_PREFIX + messageTemplate.getId()) > 0) { + String taskId = redisTemplate.opsForList().leftPop(DING_DING_RECALL_KEY_PREFIX + messageTemplate.getId()); + DingTalkClient client = new DefaultDingTalkClient(RECALL_URL); + OapiMessageCorpconversationRecallRequest req = new OapiMessageCorpconversationRecallRequest(); + req.setAgentId(Long.valueOf(account.getAgentId())); + req.setMsgTaskId(Long.valueOf(taskId)); + OapiMessageCorpconversationRecallResponse rsp = client.execute(req, accessToken); + logUtils.print(LogParam.builder().bizType(RECALL_BIZ_TYPE).object(JSON.toJSONString(rsp)).build()); + } + } catch (Exception e) { + log.error("DingDingWorkNoticeHandler#recall fail:{}", Throwables.getStackTraceAsString(e)); + } + }); + } } diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/EmailHandler.java b/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/EmailHandler.java index f6e8a25..71556cd 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/EmailHandler.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/EmailHandler.java @@ -13,6 +13,7 @@ import com.java3y.austin.handler.enums.RateLimitStrategy; import com.java3y.austin.handler.flowcontrol.FlowControlParam; import com.java3y.austin.handler.handler.BaseHandler; import com.java3y.austin.handler.handler.Handler; +import com.java3y.austin.support.domain.MessageTemplate; import com.java3y.austin.support.utils.AccountUtils; import com.sun.mail.util.MailSSLSocketFactory; import lombok.extern.slf4j.Slf4j; @@ -73,5 +74,8 @@ public class EmailHandler extends BaseHandler implements Handler { } return account; } + @Override + public void recall(MessageTemplate messageTemplate) { + } } diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/EnterpriseWeChatHandler.java b/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/EnterpriseWeChatHandler.java index c2e9bbf..395fa2e 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/EnterpriseWeChatHandler.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/EnterpriseWeChatHandler.java @@ -10,6 +10,7 @@ import com.java3y.austin.common.dto.model.EnterpriseWeChatContentModel; import com.java3y.austin.common.enums.ChannelType; import com.java3y.austin.handler.handler.BaseHandler; import com.java3y.austin.handler.handler.Handler; +import com.java3y.austin.support.domain.MessageTemplate; import com.java3y.austin.support.utils.AccountUtils; import lombok.extern.slf4j.Slf4j; import me.chanjar.weixin.common.error.WxMpErrorMsgEnum; @@ -97,6 +98,10 @@ public class EnterpriseWeChatHandler extends BaseHandler implements Handler { .content(enterpriseWeChatContentModel.getContent()) .build(); } + @Override + public void recall(MessageTemplate messageTemplate) { + + } } diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/MiniProgramAccountHandler.java b/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/MiniProgramAccountHandler.java index f02044a..7b97b24 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/MiniProgramAccountHandler.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/MiniProgramAccountHandler.java @@ -9,6 +9,7 @@ import com.java3y.austin.handler.domain.wechat.WeChatMiniProgramParam; import com.java3y.austin.handler.handler.BaseHandler; import com.java3y.austin.handler.handler.Handler; import com.java3y.austin.handler.wechat.MiniProgramAccountService; +import com.java3y.austin.support.domain.MessageTemplate; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -59,6 +60,9 @@ public class MiniProgramAccountHandler extends BaseHandler implements Handler { miniProgramParam.setData(contentModel.getMap()); return miniProgramParam; } + @Override + public void recall(MessageTemplate messageTemplate) { + } } diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/OfficialAccountHandler.java b/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/OfficialAccountHandler.java index 1246043..59f8b90 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/OfficialAccountHandler.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/OfficialAccountHandler.java @@ -9,6 +9,7 @@ import com.java3y.austin.handler.domain.wechat.WeChatOfficialParam; import com.java3y.austin.handler.handler.BaseHandler; import com.java3y.austin.handler.handler.Handler; import com.java3y.austin.handler.wechat.OfficialAccountService; +import com.java3y.austin.support.domain.MessageTemplate; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -53,5 +54,9 @@ public class OfficialAccountHandler extends BaseHandler implements Handler { return false; } + @Override + public void recall(MessageTemplate messageTemplate) { + + } } diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/PushHandler.java b/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/PushHandler.java index 93feeef..fed2a1d 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/PushHandler.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/PushHandler.java @@ -19,6 +19,7 @@ import com.java3y.austin.handler.domain.push.getui.SendPushParam; import com.java3y.austin.handler.domain.push.getui.SendPushResult; import com.java3y.austin.handler.handler.BaseHandler; import com.java3y.austin.handler.handler.Handler; +import com.java3y.austin.support.domain.MessageTemplate; import com.java3y.austin.support.utils.AccountUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -160,5 +161,8 @@ public class PushHandler extends BaseHandler implements Handler { } return param; } + @Override + public void recall(MessageTemplate messageTemplate) { + } } diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/SmsHandler.java b/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/SmsHandler.java index 46a2d90..7e8b336 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/SmsHandler.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/handler/impl/SmsHandler.java @@ -17,6 +17,7 @@ import com.java3y.austin.handler.handler.BaseHandler; import com.java3y.austin.handler.handler.Handler; import com.java3y.austin.handler.script.SmsScriptHolder; import com.java3y.austin.support.dao.SmsRecordDao; +import com.java3y.austin.support.domain.MessageTemplate; import com.java3y.austin.support.domain.SmsRecord; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -156,5 +157,8 @@ public class SmsHandler extends BaseHandler implements Handler { } } + @Override + public void recall(MessageTemplate messageTemplate) { + } } diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/Receiver.java b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/Receiver.java index 305c0e5..29465ea 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/Receiver.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/Receiver.java @@ -6,9 +6,11 @@ import com.java3y.austin.common.domain.AnchorInfo; import com.java3y.austin.common.domain.LogParam; import com.java3y.austin.common.domain.TaskInfo; import com.java3y.austin.common.enums.AnchorState; +import com.java3y.austin.handler.handler.HandlerHolder; import com.java3y.austin.handler.pending.Task; import com.java3y.austin.handler.pending.TaskPendingHolder; import com.java3y.austin.handler.utils.GroupIdMappingUtils; +import com.java3y.austin.support.domain.MessageTemplate; import com.java3y.austin.support.utils.LogUtils; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -33,6 +35,7 @@ import java.util.Optional; @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public class Receiver { private static final String LOG_BIZ_TYPE = "Receiver#consumer"; + private static final String LOG_BIZ_RECALL_TYPE = "Receiver#recall"; @Autowired private ApplicationContext context; @@ -42,13 +45,20 @@ public class Receiver { @Autowired private LogUtils logUtils; + @Autowired + private HandlerHolder handlerHolder; + + /** + * 发送消息 + * @param consumerRecord + * @param topicGroupId + */ @KafkaListener(topics = "#{'${austin.business.topic.name}'}") public void consumer(ConsumerRecord consumerRecord, @Header(KafkaHeaders.GROUP_ID) String topicGroupId) { Optional kafkaMessage = Optional.ofNullable(consumerRecord.value()); if (kafkaMessage.isPresent()) { List taskInfoLists = JSON.parseArray(kafkaMessage.get(), TaskInfo.class); - String messageGroupId = GroupIdMappingUtils.getGroupIdByTaskInfo(CollUtil.getFirst(taskInfoLists.iterator())); /** @@ -63,4 +73,18 @@ public class Receiver { } } } + + /** + * 撤回消息 + * @param consumerRecord + */ + @KafkaListener(topics = "#{'${austin.business.recall.topic.name}'}",groupId = "#{'${austin.business.recall.group.name}'}") + public void recall(ConsumerRecord consumerRecord){ + Optional kafkaMessage = Optional.ofNullable(consumerRecord.value()); + if(kafkaMessage.isPresent()){ + MessageTemplate messageTemplate = JSON.parseObject(kafkaMessage.get(), MessageTemplate.class); + logUtils.print(LogParam.builder().bizType(LOG_BIZ_RECALL_TYPE).object(messageTemplate).build()); + handlerHolder.route(messageTemplate.getSendChannel()).recall(messageTemplate); + } + } } diff --git a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/AssembleAction.java b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/AssembleAction.java index ef0ecdf..9e0692c 100644 --- a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/AssembleAction.java +++ b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/AssembleAction.java @@ -5,6 +5,7 @@ import cn.hutool.core.util.StrUtil; import cn.hutool.json.JSONUtil; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; +import com.alibaba.fastjson.serializer.SerializerFeature; import com.google.common.base.Throwables; import com.java3y.austin.common.constant.AustinConstant; import com.java3y.austin.common.domain.TaskInfo; @@ -13,6 +14,7 @@ import com.java3y.austin.common.enums.ChannelType; import com.java3y.austin.common.enums.RespStatusEnum; import com.java3y.austin.common.vo.BasicResultVO; import com.java3y.austin.service.api.domain.MessageParam; +import com.java3y.austin.service.api.enums.BusinessCode; import com.java3y.austin.service.api.impl.domain.SendTaskModel; import com.java3y.austin.support.dao.MessageTemplateDao; import com.java3y.austin.support.domain.MessageTemplate; @@ -50,9 +52,12 @@ public class AssembleAction implements BusinessProcess { context.setNeedBreak(true).setResponse(BasicResultVO.fail(RespStatusEnum.TEMPLATE_NOT_FOUND)); return; } - - List taskInfos = assembleTaskInfo(sendTaskModel, messageTemplate.get()); - sendTaskModel.setTaskInfo(taskInfos); + if (BusinessCode.COMMON_SEND.getCode().equals(context.getCode())) { + List taskInfos = assembleTaskInfo(sendTaskModel, messageTemplate.get()); + sendTaskModel.setTaskInfo(taskInfos); + } else if (BusinessCode.RECALL.getCode().equals(context.getCode())) { + sendTaskModel.setMessageTemplate(messageTemplate.get()); + } } catch (Exception e) { context.setNeedBreak(true).setResponse(BasicResultVO.fail(RespStatusEnum.SERVICE_ERROR)); log.error("assemble task fail! templateId:{}, e:{}", messageTemplateId, Throwables.getStackTraceAsString(e)); diff --git a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/SendMqAction.java b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/SendMqAction.java index 8a4ad69..7711a7e 100644 --- a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/SendMqAction.java +++ b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/SendMqAction.java @@ -6,6 +6,7 @@ import com.alibaba.fastjson.serializer.SerializerFeature; import com.google.common.base.Throwables; import com.java3y.austin.common.enums.RespStatusEnum; import com.java3y.austin.common.vo.BasicResultVO; +import com.java3y.austin.service.api.enums.BusinessCode; import com.java3y.austin.service.api.impl.domain.SendTaskModel; import com.java3y.austin.support.pipeline.BusinessProcess; import com.java3y.austin.support.pipeline.ProcessContext; @@ -27,15 +28,22 @@ public class SendMqAction implements BusinessProcess { private KafkaUtils kafkaUtils; @Value("${austin.business.topic.name}") - private String topicName; + private String sendMessageTopic; + + @Value("${austin.business.recall.topic.name}") + private String austinRecall; @Override public void process(ProcessContext context) { SendTaskModel sendTaskModel = context.getProcessModel(); - String message = JSON.toJSONString(sendTaskModel.getTaskInfo(), new SerializerFeature[]{SerializerFeature.WriteClassName}); - try { - kafkaUtils.send(topicName, message); + if (BusinessCode.COMMON_SEND.getCode().equals(context.getCode())) { + String message = JSON.toJSONString(sendTaskModel.getTaskInfo(), new SerializerFeature[]{SerializerFeature.WriteClassName}); + kafkaUtils.send(sendMessageTopic, message); + } else if (BusinessCode.RECALL.getCode().equals(context.getCode())) { + String message = JSON.toJSONString(sendTaskModel.getMessageTemplate(), new SerializerFeature[]{SerializerFeature.WriteClassName}); + kafkaUtils.send(austinRecall, message); + } } catch (Exception e) { context.setNeedBreak(true).setResponse(BasicResultVO.fail(RespStatusEnum.SERVICE_ERROR)); log.error("send kafka fail! e:{},params:{}", Throwables.getStackTraceAsString(e) diff --git a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/config/PipelineConfig.java b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/config/PipelineConfig.java index 662888b..6dec68c 100644 --- a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/config/PipelineConfig.java +++ b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/config/PipelineConfig.java @@ -48,6 +48,19 @@ public class PipelineConfig { return processTemplate; } + /** + * 普通发送执行流程 + * 1.组装参数 + * 2.发送MQ + * @return + */ + @Bean("recallMessageTemplate") + public ProcessTemplate recallMessageTemplate() { + ProcessTemplate processTemplate = new ProcessTemplate(); + processTemplate.setProcessList(Arrays.asList(assembleAction, sendMqAction)); + return processTemplate; + } + /** * pipeline流程控制器 * 目前暂定只有 普通发送的流程 @@ -60,6 +73,7 @@ public class PipelineConfig { ProcessController processController = new ProcessController(); Map templateConfig = new HashMap<>(4); templateConfig.put(BusinessCode.COMMON_SEND.getCode(), commonSendTemplate()); + templateConfig.put(BusinessCode.RECALL.getCode(), recallMessageTemplate()); processController.setTemplateConfig(templateConfig); return processController; } diff --git a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/domain/SendTaskModel.java b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/domain/SendTaskModel.java index f0e6fe2..2182430 100644 --- a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/domain/SendTaskModel.java +++ b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/domain/SendTaskModel.java @@ -2,6 +2,7 @@ package com.java3y.austin.service.api.impl.domain; import com.java3y.austin.common.domain.TaskInfo; import com.java3y.austin.service.api.domain.MessageParam; +import com.java3y.austin.support.domain.MessageTemplate; import com.java3y.austin.support.pipeline.ProcessModel; import lombok.AllArgsConstructor; import lombok.Builder; @@ -36,5 +37,9 @@ public class SendTaskModel implements ProcessModel { */ private List taskInfo; + /** + * 撤回任务的信息 + */ + private MessageTemplate messageTemplate; } diff --git a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/service/RecallServiceImpl.java b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/service/RecallServiceImpl.java new file mode 100644 index 0000000..7b89e48 --- /dev/null +++ b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/service/RecallServiceImpl.java @@ -0,0 +1,37 @@ +package com.java3y.austin.service.api.impl.service; + +import com.java3y.austin.common.vo.BasicResultVO; +import com.java3y.austin.service.api.domain.SendRequest; +import com.java3y.austin.service.api.domain.SendResponse; +import com.java3y.austin.service.api.impl.domain.SendTaskModel; +import com.java3y.austin.service.api.service.RecallService; +import com.java3y.austin.support.pipeline.ProcessContext; +import com.java3y.austin.support.pipeline.ProcessController; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + + +/** + * 撤回接口 + * @author 3y + */ +@Service +public class RecallServiceImpl implements RecallService { + + @Autowired + private ProcessController processController; + + @Override + public SendResponse recall(SendRequest sendRequest) { + SendTaskModel sendTaskModel = SendTaskModel.builder() + .messageTemplateId(sendRequest.getMessageTemplateId()) + .build(); + ProcessContext context = ProcessContext.builder() + .code(sendRequest.getCode()) + .processModel(sendTaskModel) + .needBreak(false) + .response(BasicResultVO.success()).build(); + ProcessContext process = processController.process(context); + return new SendResponse(process.getResponse().getStatus(), process.getResponse().getMsg()); + } +} diff --git a/austin-service-api/src/main/java/com/java3y/austin/service/api/domain/SendRequest.java b/austin-service-api/src/main/java/com/java3y/austin/service/api/domain/SendRequest.java index 62d06ca..3651ed2 100644 --- a/austin-service-api/src/main/java/com/java3y/austin/service/api/domain/SendRequest.java +++ b/austin-service-api/src/main/java/com/java3y/austin/service/api/domain/SendRequest.java @@ -8,7 +8,7 @@ import lombok.NoArgsConstructor; import lombok.experimental.Accessors; /** - * 发送接口的参数 + * 发送/撤回接口的参数 * @author 3y */ @Data @@ -19,18 +19,22 @@ import lombok.experimental.Accessors; public class SendRequest { /** - * 执行业务类型(默认填写 "send") + * 执行业务类型 + * send:发送消息 + * recall:撤回消息 */ private String code; /** * 消息模板Id + * 【必填】 */ private Long messageTemplateId; /** * 消息相关的参数 + * 当业务类型为"send",必传 */ private MessageParam messageParam; diff --git a/austin-service-api/src/main/java/com/java3y/austin/service/api/service/RecallService.java b/austin-service-api/src/main/java/com/java3y/austin/service/api/service/RecallService.java new file mode 100644 index 0000000..95a2897 --- /dev/null +++ b/austin-service-api/src/main/java/com/java3y/austin/service/api/service/RecallService.java @@ -0,0 +1,22 @@ +package com.java3y.austin.service.api.service; + +import com.java3y.austin.service.api.domain.BatchSendRequest; +import com.java3y.austin.service.api.domain.SendRequest; +import com.java3y.austin.service.api.domain.SendResponse; + +/** + * 撤回接口 + * + * @author 3y + */ +public interface RecallService { + + + /** + * 根据模板ID撤回消息 + * + * @param sendRequest + * @return + */ + SendResponse recall(SendRequest sendRequest); +} diff --git a/austin-support/src/main/java/com/java3y/austin/support/domain/MessageTemplate.java b/austin-support/src/main/java/com/java3y/austin/support/domain/MessageTemplate.java index 21d2c85..d856c88 100644 --- a/austin-support/src/main/java/com/java3y/austin/support/domain/MessageTemplate.java +++ b/austin-support/src/main/java/com/java3y/austin/support/domain/MessageTemplate.java @@ -129,8 +129,8 @@ public class MessageTemplate implements Serializable { /** * 是否删除 - * 0:已删除 - * 1:删除 + * 0:未删除 + * 1:已删除 */ private Integer isDeleted; diff --git a/austin-web/src/main/java/com/java3y/austin/web/controller/MessageTemplateController.java b/austin-web/src/main/java/com/java3y/austin/web/controller/MessageTemplateController.java index 6c7940f..7b6dc3d 100644 --- a/austin-web/src/main/java/com/java3y/austin/web/controller/MessageTemplateController.java +++ b/austin-web/src/main/java/com/java3y/austin/web/controller/MessageTemplateController.java @@ -11,6 +11,7 @@ import com.java3y.austin.service.api.domain.MessageParam; import com.java3y.austin.service.api.domain.SendRequest; import com.java3y.austin.service.api.domain.SendResponse; import com.java3y.austin.service.api.enums.BusinessCode; +import com.java3y.austin.service.api.service.RecallService; import com.java3y.austin.service.api.service.SendService; import com.java3y.austin.support.domain.MessageTemplate; import com.java3y.austin.web.service.MessageTemplateService; @@ -50,6 +51,9 @@ public class MessageTemplateController { @Autowired private SendService sendService; + @Autowired + private RecallService recallService; + @Value("${austin.business.upload.crowd.path}") private String dataPath; @@ -131,6 +135,22 @@ public class MessageTemplateController { return BasicResultVO.success(response); } + /** + * 测试发送接口 + */ + @PostMapping("recall/{id}") + @ApiOperation("/撤回消息接口") + public BasicResultVO recall(@PathVariable("id") String id) { + + SendRequest sendRequest = SendRequest.builder().code(BusinessCode.RECALL.getCode()). + messageTemplateId(Long.valueOf(id)).build(); + SendResponse response = recallService.recall(sendRequest); + if (response.getCode() != RespStatusEnum.SUCCESS.getCode()) { + return BasicResultVO.fail(response.getMsg()); + } + return BasicResultVO.success(response); + } + /** * 启动模板的定时任务 diff --git a/austin-web/src/main/resources/application.properties b/austin-web/src/main/resources/application.properties index 28bc18d..ee75e6f 100644 --- a/austin-web/src/main/resources/application.properties +++ b/austin-web/src/main/resources/application.properties @@ -52,6 +52,8 @@ spring.redis.password=${austin-redis-password} ##################### business properties ##################### austin.business.topic.name=austinBusiness +austin.business.recall.topic.name=austinRecall +austin.business.recall.group.name=recallGroupId austin.business.log.topic.name=austinLog austin.business.graylog.ip=${austin-grayLog-ip} # TODO if windows os ,replace path !