消费数据隔离(receiver + pending + handler)

pull/2/head
3y 3 years ago
parent 0d6b3ebcea
commit 4264124660

@ -8,6 +8,7 @@ import lombok.ToString;
/**
*
*
* @author 3y
*/
@Getter
@ -16,24 +17,39 @@ import lombok.ToString;
public enum ChannelType {
IM(10, "IM(站内信)", ImContentModel.class),
PUSH(20, "push(通知栏)", PushContentModel.class),
SMS(30, "sms(短信)", SmsContentModel.class),
EMAIL(40, "email(邮件)", EmailContentModel.class),
OFFICIAL_ACCOUNT(50, "OfficialAccounts(服务号)", OfficialAccountsContentModel.class),
MINI_PROGRAM(60, "miniProgram(小程序)", MiniProgramContentModel.class),
IM(10, "IM(站内信)", ImContentModel.class, "im"),
PUSH(20, "push(通知栏)", PushContentModel.class, "push"),
SMS(30, "sms(短信)", SmsContentModel.class, "sms"),
EMAIL(40, "email(邮件)", EmailContentModel.class, "email"),
OFFICIAL_ACCOUNT(50, "OfficialAccounts(服务号)", OfficialAccountsContentModel.class, "official_accounts"),
MINI_PROGRAM(60, "miniProgram(小程序)", MiniProgramContentModel.class, "mini_program"),
;
/** 编码值 */
/**
*
*/
private Integer code;
/** 描述 */
/**
*
*/
private String description;
/** 内容模型Class */
/**
* Class
*/
private Class contentModelClass;
/**
*
*/
private String code_en;
/**
* codeclass
* @param code
* @return
*/
public static Class getChanelModelClassByCode(Integer code) {
ChannelType[] values = values();
for (ChannelType value : values) {
@ -43,4 +59,19 @@ public enum ChannelType {
}
return null;
}
/**
* codeenum
* @param code
* @return
*/
public static ChannelType getEnumByCode(Integer code) {
ChannelType[] values = values();
for (ChannelType value : values) {
if (value.getCode().equals(code)) {
return value;
}
}
return null;
}
}

@ -12,13 +12,41 @@ import lombok.ToString;
@ToString
@AllArgsConstructor
public enum MessageType {
NOTICE(10,"通知类消息"),
MARKETING(20,"营销类消息"),
AUTH_CODE(30,"验证码消息")
NOTICE(10,"通知类消息","notice"),
MARKETING(20,"营销类消息","marketing"),
AUTH_CODE(30,"验证码消息","auth_code")
;
/**
*
*/
private Integer code;
/**
*
*/
private String description;
/**
*
*/
private String code_en;
/**
* codeenum
* @param code
* @return
*/
public static MessageType getEnumByCode(Integer code) {
MessageType[] values = values();
for (MessageType value : values) {
if (value.getCode().equals(code)) {
return value;
}
}
return null;
}
}

@ -1,40 +0,0 @@
package com.java3y.austin;
import com.alibaba.fastjson.JSON;
import com.java3y.austin.domain.TaskInfo;
import com.java3y.austin.handler.SmsHandler;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Optional;
/**
* @author 3y
* MQ
*/
@Component
@Slf4j
public class Receiver {
@Autowired
private SmsHandler smsHandler;
@KafkaListener(topics = "#{'${austin.topic.name}'}", groupId = "austin")
public void consumer(ConsumerRecord<?, String> consumerRecord) {
Optional<String> kafkaMessage = Optional.ofNullable(consumerRecord.value());
if (kafkaMessage.isPresent()) {
List<TaskInfo> lists = JSON.parseArray(kafkaMessage.get(), TaskInfo.class);
for (TaskInfo taskInfo : lists) {
smsHandler.doHandler(taskInfo);
}
log.info("receiver message:{}", JSON.toJSONString(lists));
}
}
}

@ -0,0 +1,36 @@
package com.java3y.austin.config;
import com.java3y.austin.pending.Task;
import com.java3y.austin.receiver.Receiver;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
/**
* Handler
*
* @author 3y
*/
@Configuration
public class PrototypeBeanConfig {
/**
* Receiver
*/
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public Receiver receiver() {
return new Receiver();
}
/**
* Task
*/
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public Task task() {
return new Task();
}
}

@ -1,52 +1,35 @@
package com.java3y.austin.config;
import cn.hutool.core.thread.ExecutorBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import java.util.concurrent.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 线
* 线
* @author 3y
*/
@Configuration
@EnableAsync
public class ThreadPoolConfig {
@Bean("smsThreadPool")
public static ExecutorService getSmsThreadPool() {
/**
* @param coreSize
* @param maxSize
* @param queueSize
* CallerRunsPolicy
* @return
*/
public static ExecutorService getThreadPool(Integer coreSize, Integer maxSize, Integer queueSize) {
ThreadPoolExecutor threadPoolExecutor = ExecutorBuilder.create()
.setCorePoolSize(4)
.setMaxPoolSize(4)
.setKeepAliveTime(60)
.setWorkQueue(new LinkedBlockingQueue<>(1000))
.setHandler((r, executor) -> {
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
}
})
.setCorePoolSize(coreSize)
.setMaxPoolSize(maxSize)
.setKeepAliveTime(60, TimeUnit.SECONDS)
.setWorkQueue(new LinkedBlockingQueue<>(queueSize))
.setHandler(new ThreadPoolExecutor.CallerRunsPolicy())
.build();
return threadPoolExecutor;
}
@Bean("emailThreadPoll")
public static ExecutorService getEmailThreadPool() {
ThreadPoolExecutor threadPoolExecutor = ExecutorBuilder.create()
.setCorePoolSize(2)
.setMaxPoolSize(2)
.setKeepAliveTime(60)
.setWorkQueue(new LinkedBlockingQueue<>(1000))
.setHandler((r, executor) -> {
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
}
})
.build();
return threadPoolExecutor;
}
}

@ -0,0 +1,17 @@
package com.java3y.austin.handler;
import com.java3y.austin.domain.TaskInfo;
import org.springframework.stereotype.Component;
/**
*
*
* @author 3y
*/
@Component
public class EmailHandler extends Handler {
@Override
public void handler(TaskInfo taskInfoList) {
}
}

@ -1,12 +1,33 @@
package com.java3y.austin.handler;
import com.java3y.austin.domain.TaskInfo;
import com.java3y.austin.enums.ChannelType;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.PostConstruct;
/**
* @author 3y
* handler
*/
public interface Handler {
public abstract class Handler {
@Autowired
private HandlerHolder handlerHolder;
/**
* Handler
*/
@PostConstruct
private void init() {
for (ChannelType channelType : ChannelType.values()) {
handlerHolder.putHandler(channelType.getCode(), this);
}
}
public void doHandler(TaskInfo taskInfo) {
handler(taskInfo);
}
/**
* handler
@ -14,7 +35,6 @@ public interface Handler {
* @param taskInfo
* @return
*/
boolean doHandler(TaskInfo taskInfo);
public abstract void handler(TaskInfo taskInfo);
}

@ -0,0 +1,26 @@
package com.java3y.austin.handler;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
/**
* channel->Handler
*
* @author 3y
*/
@Component
public class HandlerHolder {
private Map<Integer, Handler> handlers = new HashMap<Integer, Handler>(32);
public void putHandler(Integer channelCode, Handler handler) {
handlers.put(channelCode, handler);
}
public Handler route(Integer channelCode) {
return handlers.get(channelCode);
}
}

@ -3,10 +3,9 @@ package com.java3y.austin.handler;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import com.java3y.austin.dao.SmsRecordDao;
import com.java3y.austin.domain.SmsRecord;
import com.java3y.austin.domain.SmsParam;
import com.java3y.austin.domain.SmsRecord;
import com.java3y.austin.domain.TaskInfo;
import com.java3y.austin.dto.ContentModel;
import com.java3y.austin.dto.SmsContentModel;
import com.java3y.austin.script.SmsScript;
import org.springframework.beans.factory.annotation.Autowired;
@ -15,10 +14,12 @@ import org.springframework.stereotype.Component;
import java.util.List;
/**
*
*
* @author 3y
*/
@Component
public class SmsHandler implements Handler {
public class SmsHandler extends Handler {
@Autowired
private SmsRecordDao smsRecordDao;
@ -26,8 +27,9 @@ public class SmsHandler implements Handler {
@Autowired
private SmsScript smsScript;
@Override
public boolean doHandler(TaskInfo taskInfo) {
public void handler(TaskInfo taskInfo) {
SmsParam smsParam = SmsParam.builder()
.phones(taskInfo.getReceiver())
@ -39,10 +41,7 @@ public class SmsHandler implements Handler {
if (!CollUtil.isEmpty(recordList)) {
smsRecordDao.saveAll(recordList);
return true;
}
return false;
}
@ -60,4 +59,6 @@ public class SmsHandler implements Handler {
return smsContentModel.getContent();
}
}
}

@ -0,0 +1,37 @@
package com.java3y.austin.pending;
import com.java3y.austin.domain.TaskInfo;
import com.java3y.austin.handler.HandlerHolder;
import lombok.Data;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
/**
* Task
* 1.
* 2.
*
* @author 3y
*/
@Data
@Accessors(chain = true)
@Slf4j
public class Task implements Runnable {
@Autowired
private HandlerHolder handlerHolder;
private TaskInfo taskInfo;
@Override
public void run() {
// 1. TODO 通用去重
// 2. 真正发送消息
handlerHolder.route(taskInfo.getSendChannel())
.doHandler(taskInfo);
}
}

@ -0,0 +1,60 @@
package com.java3y.austin.pending;
import com.java3y.austin.config.ThreadPoolConfig;
import com.java3y.austin.utils.GroupIdMappingUtils;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
/**
* TaskPending
* @author 3y
*/
@Component
public class TaskPendingHolder {
private Map<String, ExecutorService> taskPendingHolder = new HashMap<>(32);
/**
* groupId
*/
private static List<String> groupIds = GroupIdMappingUtils.getAllGroupIds();
/**
* 线
*/
private Integer coreSize = 3;
private Integer maxSize = 3;
private Integer queueSize = 100;
/**
* 线
*
* TODO groupId 线
*
*/
@PostConstruct
public void init() {
for (String groupId : groupIds) {
taskPendingHolder.put(groupId, ThreadPoolConfig.getThreadPool(coreSize, maxSize, queueSize));
}
}
/**
* 线
* @param groupId
* @return
*/
public ExecutorService route(String groupId) {
return taskPendingHolder.get(groupId);
}
}

@ -0,0 +1,53 @@
package com.java3y.austin.receiver;
import com.alibaba.fastjson.JSON;
import com.java3y.austin.domain.TaskInfo;
import com.java3y.austin.pending.Task;
import com.java3y.austin.pending.TaskPendingHolder;
import com.java3y.austin.utils.GroupIdMappingUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import java.util.List;
import java.util.Optional;
/**
* @author 3y
* MQ
*/
@Slf4j
public class Receiver {
@Autowired
private ApplicationContext context;
@Autowired
private TaskPendingHolder taskPendingHolder;
@KafkaListener(topics = "#{'${austin.topic.name}'}")
public void consumer(ConsumerRecord<?, String> consumerRecord, @Header(KafkaHeaders.GROUP_ID) String topicGroupId) {
Optional<String> kafkaMessage = Optional.ofNullable(consumerRecord.value());
if (kafkaMessage.isPresent()) {
List<TaskInfo> TaskInfoLists = JSON.parseArray(kafkaMessage.get(), TaskInfo.class);
String messageGroupId = GroupIdMappingUtils.getGroupIdByTaskInfo(TaskInfoLists.get(0));
/**
*
*/
if (topicGroupId.equals(messageGroupId)) {
for (TaskInfo taskInfo : TaskInfoLists) {
Task task = context.getBean(Task.class).setTaskInfo(taskInfo);
taskPendingHolder.route(topicGroupId).execute(task);
}
}
}
}
}

@ -0,0 +1,66 @@
package com.java3y.austin.receiver;
import com.java3y.austin.utils.GroupIdMappingUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.lang.reflect.Method;
import java.util.List;
/**
*
*
* @author 3y
* @date 2021/12/4
*/
@Service
public class ReceiverStart {
@Autowired
private ApplicationContext context;
/**
* receiver
*/
private static final String RECEIVER_METHOD_NAME = "Receiver.consumer";
/**
* groupId
*/
private static List<String> groupIds = GroupIdMappingUtils.getAllGroupIds();
/**
* (groupIds)
*/
private static Integer index = 0;
/**
* Receiver
*/
@PostConstruct
public void init() {
for (int i = 0; i < groupIds.size(); i++) {
context.getBean(Receiver.class);
}
}
/**
* Receiverconsumer @KafkaListenergroupId
*/
@Bean
public static KafkaListenerAnnotationBeanPostProcessor.AnnotationEnhancer groupIdEnhancer() {
return (attrs, element) -> {
if (element instanceof Method) {
String name = ((Method) element).getDeclaringClass().getSimpleName() + "." + ((Method) element).getName();
if (RECEIVER_METHOD_NAME.equals(name)) {
attrs.put("groupId", groupIds.get(index));
index++;
}
}
return attrs;
};
}
}

@ -6,11 +6,10 @@ import cn.hutool.core.util.IdUtil;
import com.alibaba.fastjson.JSON;
import com.google.common.base.Throwables;
import com.java3y.austin.constant.AustinConstant;
import com.java3y.austin.domain.SmsParam;
import com.java3y.austin.domain.SmsRecord;
import com.java3y.austin.enums.SmsStatus;
import com.java3y.austin.domain.SmsParam;
import com.tencentcloudapi.common.Credential;
import com.tencentcloudapi.common.exception.TencentCloudSDKException;
import com.tencentcloudapi.common.profile.ClientProfile;
import com.tencentcloudapi.common.profile.HttpProfile;
import com.tencentcloudapi.sms.v20210111.SmsClient;
@ -70,7 +69,6 @@ public class TencentSmsScript implements SmsScript {
SendSmsResponse response = client.SendSms(request);
return assembleSmsRecord(smsParam,response);
} catch (Exception e) {
log.error("send tencent sms fail!{},params:{}",
Throwables.getStackTraceAsString(e), JSON.toJSONString(smsParam));

@ -0,0 +1,43 @@
package com.java3y.austin.utils;
import com.java3y.austin.domain.TaskInfo;
import com.java3y.austin.enums.ChannelType;
import com.java3y.austin.enums.MessageType;
import java.util.ArrayList;
import java.util.List;
/**
* groupId
*
* @author 3y
*/
public class GroupIdMappingUtils {
/**
* groupIds
* (groupId)
*/
public static List<String> getAllGroupIds() {
List<String> groupIds = new ArrayList<>();
for (ChannelType channelType : ChannelType.values()) {
for (MessageType messageType : MessageType.values()) {
groupIds.add(channelType.getCode_en() + "." + messageType.getCode_en());
}
}
return groupIds;
}
/**
* TaskInfogroupId
* @param taskInfo
* @return
*/
public static String getGroupIdByTaskInfo(TaskInfo taskInfo) {
String channelCodeEn = ChannelType.getEnumByCode(taskInfo.getSendChannel()).getCode_en();
String msgCodeEn = MessageType.getEnumByCode(taskInfo.getMsgType()).getCode_en();
return channelCodeEn + "." + msgCodeEn;
}
}

@ -1,11 +1,10 @@
package com.java3y.austin.action;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.StrUtil;
import com.java3y.austin.enums.RespStatusEnum;
import com.java3y.austin.domain.MessageParam;
import com.java3y.austin.domain.SendTaskModel;
import com.java3y.austin.enums.RespStatusEnum;
import com.java3y.austin.pipeline.BusinessProcess;
import com.java3y.austin.pipeline.ProcessContext;
import com.java3y.austin.vo.BasicResultVO;
@ -29,13 +28,13 @@ public class PreParamCheckAction implements BusinessProcess {
Long messageTemplateId = sendTaskModel.getMessageTemplateId();
List<MessageParam> messageParamList = sendTaskModel.getMessageParamList();
// 没有传入 消息模板Id messageParam
// 没有传入 消息模板Id 或者 messageParam
if (messageTemplateId == null || CollUtil.isEmpty(messageParamList)) {
context.setNeedBreak(true).setResponse(BasicResultVO.fail(RespStatusEnum.CLIENT_BAD_PARAMETERS));
return;
}
// 过滤接收者为null的messageParam
// 过滤 receiver=null 的messageParam
List<MessageParam> resultMessageParamList = messageParamList.stream()
.filter(messageParam -> !StrUtil.isBlank(messageParam.getReceiver()))
.collect(Collectors.toList());

@ -25,10 +25,10 @@ public class PipelineConfig {
/**
*
* 1.
* 1.
* 2.
* 3. MQ
*
* 3.
* 4. MQ
* @return
*/
@Bean("commonSendTemplate")

@ -22,30 +22,37 @@ public class SendController {
@Autowired
private SendService sendService;
@GetMapping("/sendSmsV2")
public SendResponse sendSmsV2(String phone) {
/**
*
*
* @param phone
* @return
*/
@GetMapping("/sendSmsTest")
public SendResponse sendSmsTest(String phone, Long templateId) {
/**
*
* messageTemplate Id 1
* messageTemplate Id 1
* {"auditStatus":10,"auditor":"yyyyyyz","created":1636978066,"creator":"yyyyc","deduplicationTime":1,"expectPushTime":"0","flowId":"yyyy","id":1,"idType":30,"isDeleted":0,"isNightShield":0,"msgContent":"{\"content\":\"{$contentValue}\"}","msgStatus":10,"msgType":10,"name":"test短信","proposer":"yyyy22","sendAccount":66,"sendChannel":30,"team":"yyyt","templateType":10,"updated":1636978066,"updator":"yyyyu"}
*
* messageTemplate Id 2
* {"auditStatus":10,"auditor":"yyyyyyz","created":1636978066,"creator":"yyyyc","deduplicationTime":1,"expectPushTime":"0","flowId":"yyyy","id":1,"idType":30,"isDeleted":0,"isNightShield":0,"msgContent":"{\"content\":\"{$contentValue}\"}","msgStatus":10,"msgType":20,"name":"test短信","proposer":"yyyy22","sendAccount":66,"sendChannel":30,"team":"yyyt","templateType":10,"updated":1636978066,"updator":"yyyyu"}
*/
// 文案参数
Map<String, String> variables = new HashMap<>(8);
variables.put("contentValue", "6666");
MessageParam messageParam = new MessageParam().setReceiver(phone).setVariables(variables);
// ID为1的消息模板
SendRequest sendRequest = new SendRequest().setCode(BusinessCode.COMMON_SEND.getCode())
.setMessageTemplateId(1L)
.setMessageTemplateId(templateId)
.setMessageParam(messageParam);
SendResponse response = sendService.send(sendRequest);
return sendService.send(sendRequest);
return response;
}
}

@ -27,7 +27,7 @@ spring:
driver-class-name:
# kafka相关的信息配置 TODO
kafka:
bootstrap-servers:
bootstrap-servers: 119.91.205.248:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
@ -37,6 +37,9 @@ spring:
auto:
offset:
reset: earliest
auto-commit-interval: 1000
enable-auto-commit: true
# tomcat / HikariPool(数据库连接池 配置) TODO
# 消息topicName TODO

Loading…
Cancel
Save