From 4264124660dc8512e7003ff36310d3979ceb1f68 Mon Sep 17 00:00:00 2001 From: 3y Date: Sat, 4 Dec 2021 17:39:51 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B6=88=E8=B4=B9=E6=95=B0=E6=8D=AE=E9=9A=94?= =?UTF-8?q?=E7=A6=BB(receiver=20+=20pending=20+=20handler)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/java3y/austin/enums/ChannelType.java | 49 +++++++++++--- .../com/java3y/austin/enums/MessageType.java | 36 ++++++++-- .../main/java/com/java3y/austin/Receiver.java | 40 ----------- .../austin/config/PrototypeBeanConfig.java | 36 ++++++++++ .../austin/config/ThreadPoolConfig.java | 55 ++++++---------- .../java3y/austin/handler/EmailHandler.java | 17 +++++ .../com/java3y/austin/handler/Handler.java | 26 +++++++- .../java3y/austin/handler/HandlerHolder.java | 26 ++++++++ .../com/java3y/austin/handler/SmsHandler.java | 15 +++-- .../java/com/java3y/austin/pending/Task.java | 37 +++++++++++ .../austin/pending/TaskPendingHolder.java | 60 +++++++++++++++++ .../com/java3y/austin/receiver/Receiver.java | 53 +++++++++++++++ .../java3y/austin/receiver/ReceiverStart.java | 66 +++++++++++++++++++ .../austin/script/TencentSmsScript.java | 4 +- .../austin/utils/GroupIdMappingUtils.java | 43 ++++++++++++ .../austin/action/PreParamCheckAction.java | 7 +- .../java3y/austin/config/PipelineConfig.java | 6 +- .../austin/controller/SendController.java | 23 ++++--- austin-web/src/main/resources/application.yml | 5 +- 19 files changed, 486 insertions(+), 118 deletions(-) delete mode 100644 austin-handler/src/main/java/com/java3y/austin/Receiver.java create mode 100644 austin-handler/src/main/java/com/java3y/austin/config/PrototypeBeanConfig.java create mode 100644 austin-handler/src/main/java/com/java3y/austin/handler/EmailHandler.java create mode 100644 austin-handler/src/main/java/com/java3y/austin/handler/HandlerHolder.java create mode 100644 austin-handler/src/main/java/com/java3y/austin/pending/Task.java create mode 100644 austin-handler/src/main/java/com/java3y/austin/pending/TaskPendingHolder.java create mode 100644 austin-handler/src/main/java/com/java3y/austin/receiver/Receiver.java create mode 100644 austin-handler/src/main/java/com/java3y/austin/receiver/ReceiverStart.java create mode 100644 austin-handler/src/main/java/com/java3y/austin/utils/GroupIdMappingUtils.java diff --git a/austin-common/src/main/java/com/java3y/austin/enums/ChannelType.java b/austin-common/src/main/java/com/java3y/austin/enums/ChannelType.java index bfb841f..944f74d 100644 --- a/austin-common/src/main/java/com/java3y/austin/enums/ChannelType.java +++ b/austin-common/src/main/java/com/java3y/austin/enums/ChannelType.java @@ -8,6 +8,7 @@ import lombok.ToString; /** * 发送渠道类型枚举 + * * @author 3y */ @Getter @@ -16,24 +17,39 @@ import lombok.ToString; public enum ChannelType { - IM(10, "IM(站内信)", ImContentModel.class), - PUSH(20, "push(通知栏)", PushContentModel.class), - SMS(30, "sms(短信)", SmsContentModel.class), - EMAIL(40, "email(邮件)", EmailContentModel.class), - OFFICIAL_ACCOUNT(50, "OfficialAccounts(服务号)", OfficialAccountsContentModel.class), - MINI_PROGRAM(60, "miniProgram(小程序)", MiniProgramContentModel.class), + IM(10, "IM(站内信)", ImContentModel.class, "im"), + PUSH(20, "push(通知栏)", PushContentModel.class, "push"), + SMS(30, "sms(短信)", SmsContentModel.class, "sms"), + EMAIL(40, "email(邮件)", EmailContentModel.class, "email"), + OFFICIAL_ACCOUNT(50, "OfficialAccounts(服务号)", OfficialAccountsContentModel.class, "official_accounts"), + MINI_PROGRAM(60, "miniProgram(小程序)", MiniProgramContentModel.class, "mini_program"), ; - /** 编码值 */ + /** + * 编码值 + */ private Integer code; - /** 描述 */ + /** + * 描述 + */ private String description; - /** 内容模型Class */ + /** + * 内容模型Class + */ private Class contentModelClass; + /** + * 英文标识 + */ + private String code_en; + /** + * 通过code获取class + * @param code + * @return + */ public static Class getChanelModelClassByCode(Integer code) { ChannelType[] values = values(); for (ChannelType value : values) { @@ -43,4 +59,19 @@ public enum ChannelType { } return null; } + + /** + * 通过code获取enum + * @param code + * @return + */ + public static ChannelType getEnumByCode(Integer code) { + ChannelType[] values = values(); + for (ChannelType value : values) { + if (value.getCode().equals(code)) { + return value; + } + } + return null; + } } diff --git a/austin-common/src/main/java/com/java3y/austin/enums/MessageType.java b/austin-common/src/main/java/com/java3y/austin/enums/MessageType.java index 3ca69f5..ad354f8 100644 --- a/austin-common/src/main/java/com/java3y/austin/enums/MessageType.java +++ b/austin-common/src/main/java/com/java3y/austin/enums/MessageType.java @@ -12,13 +12,41 @@ import lombok.ToString; @ToString @AllArgsConstructor public enum MessageType { - NOTICE(10,"通知类消息"), - MARKETING(20,"营销类消息"), - AUTH_CODE(30,"验证码消息") + NOTICE(10,"通知类消息","notice"), + MARKETING(20,"营销类消息","marketing"), + AUTH_CODE(30,"验证码消息","auth_code") ; - + /** + * 编码值 + */ private Integer code; + + /** + * 描述 + */ private String description; + /** + * 英文标识 + */ + private String code_en; + + + /** + * 通过code获取enum + * @param code + * @return + */ + public static MessageType getEnumByCode(Integer code) { + MessageType[] values = values(); + for (MessageType value : values) { + if (value.getCode().equals(code)) { + return value; + } + } + return null; + } + + } diff --git a/austin-handler/src/main/java/com/java3y/austin/Receiver.java b/austin-handler/src/main/java/com/java3y/austin/Receiver.java deleted file mode 100644 index e3f0cdd..0000000 --- a/austin-handler/src/main/java/com/java3y/austin/Receiver.java +++ /dev/null @@ -1,40 +0,0 @@ -package com.java3y.austin; - -import com.alibaba.fastjson.JSON; -import com.java3y.austin.domain.TaskInfo; -import com.java3y.austin.handler.SmsHandler; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.stereotype.Component; - -import java.util.List; -import java.util.Optional; - -/** - * @author 3y - * 消费MQ的消息 - */ -@Component -@Slf4j -public class Receiver { - - @Autowired - private SmsHandler smsHandler; - - @KafkaListener(topics = "#{'${austin.topic.name}'}", groupId = "austin") - public void consumer(ConsumerRecord consumerRecord) { - Optional kafkaMessage = Optional.ofNullable(consumerRecord.value()); - if (kafkaMessage.isPresent()) { - List lists = JSON.parseArray(kafkaMessage.get(), TaskInfo.class); - for (TaskInfo taskInfo : lists) { - smsHandler.doHandler(taskInfo); - } - log.info("receiver message:{}", JSON.toJSONString(lists)); - } - - } - -} diff --git a/austin-handler/src/main/java/com/java3y/austin/config/PrototypeBeanConfig.java b/austin-handler/src/main/java/com/java3y/austin/config/PrototypeBeanConfig.java new file mode 100644 index 0000000..7f30604 --- /dev/null +++ b/austin-handler/src/main/java/com/java3y/austin/config/PrototypeBeanConfig.java @@ -0,0 +1,36 @@ +package com.java3y.austin.config; + +import com.java3y.austin.pending.Task; +import com.java3y.austin.receiver.Receiver; +import org.springframework.beans.factory.config.ConfigurableBeanFactory; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Scope; + +/** + * Handler模块的配置信息 + * + * @author 3y + */ +@Configuration +public class PrototypeBeanConfig { + + /** + * 定义多例的Receiver + */ + @Bean + @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) + public Receiver receiver() { + return new Receiver(); + } + + /** + * 定义多例的Task + */ + @Bean + @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) + public Task task() { + return new Task(); + } + +} diff --git a/austin-handler/src/main/java/com/java3y/austin/config/ThreadPoolConfig.java b/austin-handler/src/main/java/com/java3y/austin/config/ThreadPoolConfig.java index 1893387..b557a82 100644 --- a/austin-handler/src/main/java/com/java3y/austin/config/ThreadPoolConfig.java +++ b/austin-handler/src/main/java/com/java3y/austin/config/ThreadPoolConfig.java @@ -1,52 +1,35 @@ package com.java3y.austin.config; import cn.hutool.core.thread.ExecutorBuilder; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.scheduling.annotation.EnableAsync; -import java.util.concurrent.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; /** - * 线程池配置信息 + * 线程池配置 * @author 3y */ -@Configuration -@EnableAsync public class ThreadPoolConfig { - @Bean("smsThreadPool") - public static ExecutorService getSmsThreadPool() { + /** + * @param coreSize + * @param maxSize + * @param queueSize + * 阻塞队列满了,也不丢弃任务 CallerRunsPolicy 策略 + * @return + */ + public static ExecutorService getThreadPool(Integer coreSize, Integer maxSize, Integer queueSize) { ThreadPoolExecutor threadPoolExecutor = ExecutorBuilder.create() - .setCorePoolSize(4) - .setMaxPoolSize(4) - .setKeepAliveTime(60) - .setWorkQueue(new LinkedBlockingQueue<>(1000)) - .setHandler((r, executor) -> { - try { - executor.getQueue().put(r); - } catch (InterruptedException e) { - } - }) + .setCorePoolSize(coreSize) + .setMaxPoolSize(maxSize) + .setKeepAliveTime(60, TimeUnit.SECONDS) + .setWorkQueue(new LinkedBlockingQueue<>(queueSize)) + .setHandler(new ThreadPoolExecutor.CallerRunsPolicy()) .build(); - return threadPoolExecutor; } - @Bean("emailThreadPoll") - public static ExecutorService getEmailThreadPool() { - ThreadPoolExecutor threadPoolExecutor = ExecutorBuilder.create() - .setCorePoolSize(2) - .setMaxPoolSize(2) - .setKeepAliveTime(60) - .setWorkQueue(new LinkedBlockingQueue<>(1000)) - .setHandler((r, executor) -> { - try { - executor.getQueue().put(r); - } catch (InterruptedException e) { - } - }) - .build(); - return threadPoolExecutor; - } + } diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/EmailHandler.java b/austin-handler/src/main/java/com/java3y/austin/handler/EmailHandler.java new file mode 100644 index 0000000..fdf55a8 --- /dev/null +++ b/austin-handler/src/main/java/com/java3y/austin/handler/EmailHandler.java @@ -0,0 +1,17 @@ +package com.java3y.austin.handler; + +import com.java3y.austin.domain.TaskInfo; +import org.springframework.stereotype.Component; + +/** + * 邮件发送处理 + * + * @author 3y + */ +@Component +public class EmailHandler extends Handler { + + @Override + public void handler(TaskInfo taskInfoList) { + } +} diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/Handler.java b/austin-handler/src/main/java/com/java3y/austin/handler/Handler.java index b1e98ba..7e322b9 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/Handler.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/Handler.java @@ -1,12 +1,33 @@ package com.java3y.austin.handler; import com.java3y.austin.domain.TaskInfo; +import com.java3y.austin.enums.ChannelType; +import org.springframework.beans.factory.annotation.Autowired; + +import javax.annotation.PostConstruct; /** * @author 3y * 发送各个渠道的handler */ -public interface Handler { +public abstract class Handler { + + @Autowired + private HandlerHolder handlerHolder; + + /** + * 初始化渠道与Handler的映射关系 + */ + @PostConstruct + private void init() { + for (ChannelType channelType : ChannelType.values()) { + handlerHolder.putHandler(channelType.getCode(), this); + } + } + + public void doHandler(TaskInfo taskInfo) { + handler(taskInfo); + } /** * 统一处理的handler接口 @@ -14,7 +35,6 @@ public interface Handler { * @param taskInfo * @return */ - boolean doHandler(TaskInfo taskInfo); - + public abstract void handler(TaskInfo taskInfo); } diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/HandlerHolder.java b/austin-handler/src/main/java/com/java3y/austin/handler/HandlerHolder.java new file mode 100644 index 0000000..29b523a --- /dev/null +++ b/austin-handler/src/main/java/com/java3y/austin/handler/HandlerHolder.java @@ -0,0 +1,26 @@ +package com.java3y.austin.handler; + + +import org.springframework.stereotype.Component; + +import java.util.HashMap; +import java.util.Map; + +/** + * channel->Handler的映射关系 + * + * @author 3y + */ +@Component +public class HandlerHolder { + + private Map handlers = new HashMap(32); + + public void putHandler(Integer channelCode, Handler handler) { + handlers.put(channelCode, handler); + } + + public Handler route(Integer channelCode) { + return handlers.get(channelCode); + } +} diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/SmsHandler.java b/austin-handler/src/main/java/com/java3y/austin/handler/SmsHandler.java index cc676f6..7ddcb3b 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/SmsHandler.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/SmsHandler.java @@ -3,10 +3,9 @@ package com.java3y.austin.handler; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.util.StrUtil; import com.java3y.austin.dao.SmsRecordDao; -import com.java3y.austin.domain.SmsRecord; import com.java3y.austin.domain.SmsParam; +import com.java3y.austin.domain.SmsRecord; import com.java3y.austin.domain.TaskInfo; -import com.java3y.austin.dto.ContentModel; import com.java3y.austin.dto.SmsContentModel; import com.java3y.austin.script.SmsScript; import org.springframework.beans.factory.annotation.Autowired; @@ -15,10 +14,12 @@ import org.springframework.stereotype.Component; import java.util.List; /** + * 短信发送处理 + * * @author 3y */ @Component -public class SmsHandler implements Handler { +public class SmsHandler extends Handler { @Autowired private SmsRecordDao smsRecordDao; @@ -26,8 +27,9 @@ public class SmsHandler implements Handler { @Autowired private SmsScript smsScript; + @Override - public boolean doHandler(TaskInfo taskInfo) { + public void handler(TaskInfo taskInfo) { SmsParam smsParam = SmsParam.builder() .phones(taskInfo.getReceiver()) @@ -39,10 +41,7 @@ public class SmsHandler implements Handler { if (!CollUtil.isEmpty(recordList)) { smsRecordDao.saveAll(recordList); - return true; } - - return false; } @@ -60,4 +59,6 @@ public class SmsHandler implements Handler { return smsContentModel.getContent(); } } + + } diff --git a/austin-handler/src/main/java/com/java3y/austin/pending/Task.java b/austin-handler/src/main/java/com/java3y/austin/pending/Task.java new file mode 100644 index 0000000..42d4e6a --- /dev/null +++ b/austin-handler/src/main/java/com/java3y/austin/pending/Task.java @@ -0,0 +1,37 @@ +package com.java3y.austin.pending; + + +import com.java3y.austin.domain.TaskInfo; +import com.java3y.austin.handler.HandlerHolder; +import lombok.Data; +import lombok.experimental.Accessors; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; + +/** + * Task 执行器 + * 1.通用去重功能 + * 2.发送消息 + * + * @author 3y + */ +@Data +@Accessors(chain = true) +@Slf4j +public class Task implements Runnable { + + @Autowired + private HandlerHolder handlerHolder; + + private TaskInfo taskInfo; + + @Override + public void run() { + + // 1. TODO 通用去重 + + // 2. 真正发送消息 + handlerHolder.route(taskInfo.getSendChannel()) + .doHandler(taskInfo); + } +} diff --git a/austin-handler/src/main/java/com/java3y/austin/pending/TaskPendingHolder.java b/austin-handler/src/main/java/com/java3y/austin/pending/TaskPendingHolder.java new file mode 100644 index 0000000..31b6b4c --- /dev/null +++ b/austin-handler/src/main/java/com/java3y/austin/pending/TaskPendingHolder.java @@ -0,0 +1,60 @@ +package com.java3y.austin.pending; + +import com.java3y.austin.config.ThreadPoolConfig; +import com.java3y.austin.utils.GroupIdMappingUtils; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; + + +/** + * 存储 每种消息类型 与 TaskPending 的关系 + * @author 3y + */ +@Component +public class TaskPendingHolder { + + private Map taskPendingHolder = new HashMap<>(32); + + /** + * 获取得到所有的groupId + */ + private static List groupIds = GroupIdMappingUtils.getAllGroupIds(); + + + /** + * 线程池的参数 + */ + private Integer coreSize = 3; + private Integer maxSize = 3; + private Integer queueSize = 100; + + + /** + * 给每个渠道,每种消息类型初始化一个线程池 + * + * TODO 不同的 groupId 分配不同的线程和队列大小 + * + */ + @PostConstruct + public void init() { + for (String groupId : groupIds) { + taskPendingHolder.put(groupId, ThreadPoolConfig.getThreadPool(coreSize, maxSize, queueSize)); + } + } + + /** + * 得到对应的线程池 + * @param groupId + * @return + */ + public ExecutorService route(String groupId) { + return taskPendingHolder.get(groupId); + } + + +} diff --git a/austin-handler/src/main/java/com/java3y/austin/receiver/Receiver.java b/austin-handler/src/main/java/com/java3y/austin/receiver/Receiver.java new file mode 100644 index 0000000..2859b49 --- /dev/null +++ b/austin-handler/src/main/java/com/java3y/austin/receiver/Receiver.java @@ -0,0 +1,53 @@ +package com.java3y.austin.receiver; + +import com.alibaba.fastjson.JSON; +import com.java3y.austin.domain.TaskInfo; +import com.java3y.austin.pending.Task; +import com.java3y.austin.pending.TaskPendingHolder; +import com.java3y.austin.utils.GroupIdMappingUtils; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.messaging.handler.annotation.Header; + +import java.util.List; +import java.util.Optional; + +/** + * @author 3y + * 消费MQ的消息 + */ +@Slf4j +public class Receiver { + + @Autowired + private ApplicationContext context; + + @Autowired + private TaskPendingHolder taskPendingHolder; + + @KafkaListener(topics = "#{'${austin.topic.name}'}") + public void consumer(ConsumerRecord consumerRecord, @Header(KafkaHeaders.GROUP_ID) String topicGroupId) { + Optional kafkaMessage = Optional.ofNullable(consumerRecord.value()); + if (kafkaMessage.isPresent()) { + List TaskInfoLists = JSON.parseArray(kafkaMessage.get(), TaskInfo.class); + String messageGroupId = GroupIdMappingUtils.getGroupIdByTaskInfo(TaskInfoLists.get(0)); + + /** + * 每个消费者组 只消费 他们自身关心的消息 + */ + if (topicGroupId.equals(messageGroupId)) { + for (TaskInfo taskInfo : TaskInfoLists) { + Task task = context.getBean(Task.class).setTaskInfo(taskInfo); + taskPendingHolder.route(topicGroupId).execute(task); + } + } + } + + } + + +} diff --git a/austin-handler/src/main/java/com/java3y/austin/receiver/ReceiverStart.java b/austin-handler/src/main/java/com/java3y/austin/receiver/ReceiverStart.java new file mode 100644 index 0000000..2e2d0c8 --- /dev/null +++ b/austin-handler/src/main/java/com/java3y/austin/receiver/ReceiverStart.java @@ -0,0 +1,66 @@ +package com.java3y.austin.receiver; + +import com.java3y.austin.utils.GroupIdMappingUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; +import java.lang.reflect.Method; +import java.util.List; + +/** + * 启动消费者 + * + * @author 3y + * @date 2021/12/4 + */ +@Service +public class ReceiverStart { + @Autowired + private ApplicationContext context; + + /** + * receiver的消费方法常量 + */ + private static final String RECEIVER_METHOD_NAME = "Receiver.consumer"; + + /** + * 获取得到所有的groupId + */ + private static List groupIds = GroupIdMappingUtils.getAllGroupIds(); + + /** + * 下标(用于迭代groupIds位置) + */ + private static Integer index = 0; + + /** + * 为每个渠道不同的消息类型 创建一个Receiver对象 + */ + @PostConstruct + public void init() { + for (int i = 0; i < groupIds.size(); i++) { + context.getBean(Receiver.class); + } + } + + /** + * 给每个Receiver对象的consumer方法 @KafkaListener赋值相应的groupId + */ + @Bean + public static KafkaListenerAnnotationBeanPostProcessor.AnnotationEnhancer groupIdEnhancer() { + return (attrs, element) -> { + if (element instanceof Method) { + String name = ((Method) element).getDeclaringClass().getSimpleName() + "." + ((Method) element).getName(); + if (RECEIVER_METHOD_NAME.equals(name)) { + attrs.put("groupId", groupIds.get(index)); + index++; + } + } + return attrs; + }; + } +} diff --git a/austin-handler/src/main/java/com/java3y/austin/script/TencentSmsScript.java b/austin-handler/src/main/java/com/java3y/austin/script/TencentSmsScript.java index a545fd2..8602aa8 100644 --- a/austin-handler/src/main/java/com/java3y/austin/script/TencentSmsScript.java +++ b/austin-handler/src/main/java/com/java3y/austin/script/TencentSmsScript.java @@ -6,11 +6,10 @@ import cn.hutool.core.util.IdUtil; import com.alibaba.fastjson.JSON; import com.google.common.base.Throwables; import com.java3y.austin.constant.AustinConstant; +import com.java3y.austin.domain.SmsParam; import com.java3y.austin.domain.SmsRecord; import com.java3y.austin.enums.SmsStatus; -import com.java3y.austin.domain.SmsParam; import com.tencentcloudapi.common.Credential; -import com.tencentcloudapi.common.exception.TencentCloudSDKException; import com.tencentcloudapi.common.profile.ClientProfile; import com.tencentcloudapi.common.profile.HttpProfile; import com.tencentcloudapi.sms.v20210111.SmsClient; @@ -70,7 +69,6 @@ public class TencentSmsScript implements SmsScript { SendSmsResponse response = client.SendSms(request); return assembleSmsRecord(smsParam,response); - } catch (Exception e) { log.error("send tencent sms fail!{},params:{}", Throwables.getStackTraceAsString(e), JSON.toJSONString(smsParam)); diff --git a/austin-handler/src/main/java/com/java3y/austin/utils/GroupIdMappingUtils.java b/austin-handler/src/main/java/com/java3y/austin/utils/GroupIdMappingUtils.java new file mode 100644 index 0000000..918dfc8 --- /dev/null +++ b/austin-handler/src/main/java/com/java3y/austin/utils/GroupIdMappingUtils.java @@ -0,0 +1,43 @@ +package com.java3y.austin.utils; + + +import com.java3y.austin.domain.TaskInfo; +import com.java3y.austin.enums.ChannelType; +import com.java3y.austin.enums.MessageType; + +import java.util.ArrayList; +import java.util.List; + +/** + * groupId 标识着每一个消费者组 + * + * @author 3y + */ +public class GroupIdMappingUtils { + + /** + * 获取所有的groupIds + * (不同的渠道不同的消息类型拥有自己的groupId) + */ + public static List getAllGroupIds() { + List groupIds = new ArrayList<>(); + for (ChannelType channelType : ChannelType.values()) { + for (MessageType messageType : MessageType.values()) { + groupIds.add(channelType.getCode_en() + "." + messageType.getCode_en()); + } + } + return groupIds; + } + + + /** + * 根据TaskInfo获取当前消息的groupId + * @param taskInfo + * @return + */ + public static String getGroupIdByTaskInfo(TaskInfo taskInfo) { + String channelCodeEn = ChannelType.getEnumByCode(taskInfo.getSendChannel()).getCode_en(); + String msgCodeEn = MessageType.getEnumByCode(taskInfo.getMsgType()).getCode_en(); + return channelCodeEn + "." + msgCodeEn; + } +} diff --git a/austin-service-api-impl/src/main/java/com/java3y/austin/action/PreParamCheckAction.java b/austin-service-api-impl/src/main/java/com/java3y/austin/action/PreParamCheckAction.java index f088818..79df995 100644 --- a/austin-service-api-impl/src/main/java/com/java3y/austin/action/PreParamCheckAction.java +++ b/austin-service-api-impl/src/main/java/com/java3y/austin/action/PreParamCheckAction.java @@ -1,11 +1,10 @@ package com.java3y.austin.action; import cn.hutool.core.collection.CollUtil; -import cn.hutool.core.date.DateUtil; import cn.hutool.core.util.StrUtil; -import com.java3y.austin.enums.RespStatusEnum; import com.java3y.austin.domain.MessageParam; import com.java3y.austin.domain.SendTaskModel; +import com.java3y.austin.enums.RespStatusEnum; import com.java3y.austin.pipeline.BusinessProcess; import com.java3y.austin.pipeline.ProcessContext; import com.java3y.austin.vo.BasicResultVO; @@ -29,13 +28,13 @@ public class PreParamCheckAction implements BusinessProcess { Long messageTemplateId = sendTaskModel.getMessageTemplateId(); List messageParamList = sendTaskModel.getMessageParamList(); - // 没有传入 消息模板Id 和 messageParam + // 没有传入 消息模板Id 或者 messageParam if (messageTemplateId == null || CollUtil.isEmpty(messageParamList)) { context.setNeedBreak(true).setResponse(BasicResultVO.fail(RespStatusEnum.CLIENT_BAD_PARAMETERS)); return; } - // 过滤接收者为null的messageParam + // 过滤 receiver=null 的messageParam List resultMessageParamList = messageParamList.stream() .filter(messageParam -> !StrUtil.isBlank(messageParam.getReceiver())) .collect(Collectors.toList()); diff --git a/austin-service-api-impl/src/main/java/com/java3y/austin/config/PipelineConfig.java b/austin-service-api-impl/src/main/java/com/java3y/austin/config/PipelineConfig.java index acbe01e..1b85bf7 100644 --- a/austin-service-api-impl/src/main/java/com/java3y/austin/config/PipelineConfig.java +++ b/austin-service-api-impl/src/main/java/com/java3y/austin/config/PipelineConfig.java @@ -25,10 +25,10 @@ public class PipelineConfig { /** * 普通发送执行流程 - * 1. 参数校验 + * 1. 前置参数校验 * 2. 组装参数 - * 3. 发送消息至MQ - * + * 3. 后置参数校验 + * 4. 发送消息至MQ * @return */ @Bean("commonSendTemplate") diff --git a/austin-web/src/main/java/com/java3y/austin/controller/SendController.java b/austin-web/src/main/java/com/java3y/austin/controller/SendController.java index ea295c7..3ac6dac 100644 --- a/austin-web/src/main/java/com/java3y/austin/controller/SendController.java +++ b/austin-web/src/main/java/com/java3y/austin/controller/SendController.java @@ -22,30 +22,37 @@ public class SendController { @Autowired private SendService sendService; - @GetMapping("/sendSmsV2") - public SendResponse sendSmsV2(String phone) { + /** + * 发送 + * + * @param phone + * @return + */ + @GetMapping("/sendSmsTest") + public SendResponse sendSmsTest(String phone, Long templateId) { /** * - * messageTemplate Id 为1 的模板内容 + * messageTemplate Id 为1 的模板内容(普通短信) * {"auditStatus":10,"auditor":"yyyyyyz","created":1636978066,"creator":"yyyyc","deduplicationTime":1,"expectPushTime":"0","flowId":"yyyy","id":1,"idType":30,"isDeleted":0,"isNightShield":0,"msgContent":"{\"content\":\"{$contentValue}\"}","msgStatus":10,"msgType":10,"name":"test短信","proposer":"yyyy22","sendAccount":66,"sendChannel":30,"team":"yyyt","templateType":10,"updated":1636978066,"updator":"yyyyu"} * + * messageTemplate Id 为2 的模板内容(营销短信) + * {"auditStatus":10,"auditor":"yyyyyyz","created":1636978066,"creator":"yyyyc","deduplicationTime":1,"expectPushTime":"0","flowId":"yyyy","id":1,"idType":30,"isDeleted":0,"isNightShield":0,"msgContent":"{\"content\":\"{$contentValue}\"}","msgStatus":10,"msgType":20,"name":"test短信","proposer":"yyyy22","sendAccount":66,"sendChannel":30,"team":"yyyt","templateType":10,"updated":1636978066,"updator":"yyyyu"} */ // 文案参数 Map variables = new HashMap<>(8); variables.put("contentValue", "6666"); - MessageParam messageParam = new MessageParam().setReceiver(phone).setVariables(variables); - // ID为1的消息模板 + SendRequest sendRequest = new SendRequest().setCode(BusinessCode.COMMON_SEND.getCode()) - .setMessageTemplateId(1L) + .setMessageTemplateId(templateId) .setMessageParam(messageParam); - SendResponse response = sendService.send(sendRequest); + return sendService.send(sendRequest); - return response; } + } diff --git a/austin-web/src/main/resources/application.yml b/austin-web/src/main/resources/application.yml index 4ecbc77..f14aeff 100644 --- a/austin-web/src/main/resources/application.yml +++ b/austin-web/src/main/resources/application.yml @@ -27,7 +27,7 @@ spring: driver-class-name: # kafka相关的信息配置 TODO kafka: - bootstrap-servers: + bootstrap-servers: 119.91.205.248:9092 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer @@ -37,6 +37,9 @@ spring: auto: offset: reset: earliest + auto-commit-interval: 1000 + enable-auto-commit: true + # tomcat / HikariPool(数据库连接池 配置) TODO # 消息topicName TODO