From 608311647119d96456f375b32c36b3c65da99a31 Mon Sep 17 00:00:00 2001 From: "sky.huang" Date: Thu, 13 Jul 2023 14:37:23 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8F=90=E4=BE=9B=20=E4=B8=9A=E5=8A=A1=20bizId?= =?UTF-8?q?=20=E5=92=8C=20messageId=20=E6=B3=A8=E5=85=A5,=20=E6=94=AF?= =?UTF-8?q?=E6=8C=81=E5=9F=BA=E4=BA=8E=20messageId=20=E6=9F=A5=E8=AF=A2?= =?UTF-8?q?=E7=BB=93=E6=9E=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/constant/AustinConstant.java | 4 + .../austin/common/domain/AnchorInfo.java | 10 ++ .../austin/common/domain/SimpleTaskInfo.java | 36 ++++++ .../java3y/austin/common/domain/TaskInfo.java | 11 ++ .../service/AbstractDeduplicationService.java | 2 +- .../discard/DiscardMessageService.java | 2 +- .../austin/handler/handler/BaseHandler.java | 4 +- .../service/impl/ConsumeServiceImpl.java | 2 +- .../shield/impl/ShieldServiceImpl.java | 4 +- .../impl/action/AfterParamCheckAction.java | 6 +- .../api/impl/action/AssembleAction.java | 6 + .../api/impl/action/PreParamCheckAction.java | 4 +- .../api/impl/service/RecallServiceImpl.java | 4 +- .../api/impl/service/SendServiceImpl.java | 8 +- .../api/impl/service/TraceServiceImpl.java | 49 ++++++++ .../service/api/domain/MessageParam.java | 5 + .../service/api/domain/SendResponse.java | 9 +- .../service/api/domain/TraceResponse.java | 35 ++++++ .../service/api/service/TraceService.java | 20 ++++ .../java3y/austin/stream/sink/AustinSink.java | 15 ++- .../austin/support/utils/TaskInfoUtils.java | 8 ++ .../austin/web/controller/DataController.java | 9 ++ .../austin/web/service/DataService.java | 8 ++ .../web/service/impl/DataServiceImpl.java | 107 ++++++++++-------- .../com/java3y/austin/web/vo/DataParam.java | 6 + 25 files changed, 311 insertions(+), 63 deletions(-) create mode 100644 austin-common/src/main/java/com/java3y/austin/common/domain/SimpleTaskInfo.java create mode 100644 austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/service/TraceServiceImpl.java create mode 100644 austin-service-api/src/main/java/com/java3y/austin/service/api/domain/TraceResponse.java create mode 100644 austin-service-api/src/main/java/com/java3y/austin/service/api/service/TraceService.java diff --git a/austin-common/src/main/java/com/java3y/austin/common/constant/AustinConstant.java b/austin-common/src/main/java/com/java3y/austin/common/constant/AustinConstant.java index 961848a..4de52b3 100644 --- a/austin-common/src/main/java/com/java3y/austin/common/constant/AustinConstant.java +++ b/austin-common/src/main/java/com/java3y/austin/common/constant/AustinConstant.java @@ -27,6 +27,10 @@ public class AustinConstant { */ public static final String SEND_ALL = "@all"; + public static final String CACHE_KEY_PREFIX = "Austin"; + + + public static final String MESSAGE_ID = "MessageId"; /** * 默认的常量,如果新建模板/账号时,没传入则用该常量 diff --git a/austin-common/src/main/java/com/java3y/austin/common/domain/AnchorInfo.java b/austin-common/src/main/java/com/java3y/austin/common/domain/AnchorInfo.java index 6b74a52..e90b5cf 100644 --- a/austin-common/src/main/java/com/java3y/austin/common/domain/AnchorInfo.java +++ b/austin-common/src/main/java/com/java3y/austin/common/domain/AnchorInfo.java @@ -17,7 +17,17 @@ import java.util.Set; @AllArgsConstructor @NoArgsConstructor public class AnchorInfo { + /** + * 消息唯一Id(数据追踪使用) + * 生成逻辑参考 TaskInfoUtils + */ + private String bizId; + /** + * 消息唯一Id(数据追踪使用) + * 生成逻辑参考 TaskInfoUtils + */ + private String messageId; /** * 发送用户 */ diff --git a/austin-common/src/main/java/com/java3y/austin/common/domain/SimpleTaskInfo.java b/austin-common/src/main/java/com/java3y/austin/common/domain/SimpleTaskInfo.java new file mode 100644 index 0000000..a82457e --- /dev/null +++ b/austin-common/src/main/java/com/java3y/austin/common/domain/SimpleTaskInfo.java @@ -0,0 +1,36 @@ +package com.java3y.austin.common.domain; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * @Author: sky + * @Date: 2023/7/13 10:43 + * @Description: SimpleTaskInfo + * @Version 1.0.0 + */ +@Data +@Builder +@AllArgsConstructor +@NoArgsConstructor +public class SimpleTaskInfo { + + /** + * 业务消息发送Id, 用于链路追踪, 若不存在, 则使用 messageId + */ + private String bizId; + + /** + * 消息唯一Id(数据追踪使用) + * 生成逻辑参考 TaskInfoUtils + */ + private String messageId; + + /** + * 业务Id(数据追踪使用) + * 生成逻辑参考 TaskInfoUtils + */ + private Long businessId; +} diff --git a/austin-common/src/main/java/com/java3y/austin/common/domain/TaskInfo.java b/austin-common/src/main/java/com/java3y/austin/common/domain/TaskInfo.java index af8f377..da7d15f 100644 --- a/austin-common/src/main/java/com/java3y/austin/common/domain/TaskInfo.java +++ b/austin-common/src/main/java/com/java3y/austin/common/domain/TaskInfo.java @@ -19,6 +19,17 @@ import java.util.Set; @NoArgsConstructor public class TaskInfo { + /** + * 业务消息发送Id, 用于链路追踪, 若不存在, 则使用 messageId + */ + private String bizId; + + /** + * 消息唯一Id(数据追踪使用) + * 生成逻辑参考 TaskInfoUtils + */ + private String messageId; + /** * 消息模板Id */ diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/service/AbstractDeduplicationService.java b/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/service/AbstractDeduplicationService.java index 5b46b10..4317edb 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/service/AbstractDeduplicationService.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/deduplication/service/AbstractDeduplicationService.java @@ -46,7 +46,7 @@ public abstract class AbstractDeduplicationService implements DeduplicationServi // 剔除符合去重条件的用户 if (CollUtil.isNotEmpty(filterReceiver)) { taskInfo.getReceiver().removeAll(filterReceiver); - logUtils.print(AnchorInfo.builder().businessId(taskInfo.getBusinessId()).ids(filterReceiver).state(param.getAnchorState().getCode()).build()); + logUtils.print(AnchorInfo.builder().bizId(taskInfo.getBizId()).messageId(taskInfo.getMessageId()).businessId(taskInfo.getBusinessId()).ids(filterReceiver).state(param.getAnchorState().getCode()).build()); } } diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/discard/DiscardMessageService.java b/austin-handler/src/main/java/com/java3y/austin/handler/discard/DiscardMessageService.java index 57939e0..ec2dbbe 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/discard/DiscardMessageService.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/discard/DiscardMessageService.java @@ -38,7 +38,7 @@ public class DiscardMessageService { JSONArray array = JSON.parseArray(config.getProperty(DISCARD_MESSAGE_KEY, CommonConstant.EMPTY_VALUE_JSON_ARRAY)); if (array.contains(String.valueOf(taskInfo.getMessageTemplateId()))) { - logUtils.print(AnchorInfo.builder().businessId(taskInfo.getBusinessId()).ids(taskInfo.getReceiver()).state(AnchorState.DISCARD.getCode()).build()); + logUtils.print(AnchorInfo.builder().bizId(taskInfo.getBizId()).messageId(taskInfo.getMessageId()).businessId(taskInfo.getBusinessId()).ids(taskInfo.getReceiver()).state(AnchorState.DISCARD.getCode()).build()); return true; } return false; diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/handler/BaseHandler.java b/austin-handler/src/main/java/com/java3y/austin/handler/handler/BaseHandler.java index 748eb06..d40d032 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/handler/BaseHandler.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/handler/BaseHandler.java @@ -59,10 +59,10 @@ public abstract class BaseHandler implements Handler { public void doHandler(TaskInfo taskInfo) { flowControl(taskInfo); if (handler(taskInfo)) { - logUtils.print(AnchorInfo.builder().state(AnchorState.SEND_SUCCESS.getCode()).businessId(taskInfo.getBusinessId()).ids(taskInfo.getReceiver()).build()); + logUtils.print(AnchorInfo.builder().state(AnchorState.SEND_SUCCESS.getCode()).bizId(taskInfo.getBizId()).messageId(taskInfo.getMessageId()).businessId(taskInfo.getBusinessId()).ids(taskInfo.getReceiver()).build()); return; } - logUtils.print(AnchorInfo.builder().state(AnchorState.SEND_FAIL.getCode()).businessId(taskInfo.getBusinessId()).ids(taskInfo.getReceiver()).build()); + logUtils.print(AnchorInfo.builder().state(AnchorState.SEND_FAIL.getCode()).bizId(taskInfo.getBizId()).messageId(taskInfo.getMessageId()).businessId(taskInfo.getBusinessId()).ids(taskInfo.getReceiver()).build()); } diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/service/impl/ConsumeServiceImpl.java b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/service/impl/ConsumeServiceImpl.java index f62aa3d..7ab3bb3 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/service/impl/ConsumeServiceImpl.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/service/impl/ConsumeServiceImpl.java @@ -41,7 +41,7 @@ public class ConsumeServiceImpl implements ConsumeService { public void consume2Send(List taskInfoLists) { String topicGroupId = GroupIdMappingUtils.getGroupIdByTaskInfo(CollUtil.getFirst(taskInfoLists.iterator())); for (TaskInfo taskInfo : taskInfoLists) { - logUtils.print(LogParam.builder().bizType(LOG_BIZ_TYPE).object(taskInfo).build(), AnchorInfo.builder().ids(taskInfo.getReceiver()).businessId(taskInfo.getBusinessId()).state(AnchorState.RECEIVE.getCode()).build()); + logUtils.print(LogParam.builder().bizType(LOG_BIZ_TYPE).object(taskInfo).build(), AnchorInfo.builder().bizId(taskInfo.getBizId()).messageId(taskInfo.getMessageId()).ids(taskInfo.getReceiver()).businessId(taskInfo.getBusinessId()).state(AnchorState.RECEIVE.getCode()).build()); Task task = context.getBean(Task.class).setTaskInfo(taskInfo); taskPendingHolder.route(topicGroupId).execute(task); } diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/shield/impl/ShieldServiceImpl.java b/austin-handler/src/main/java/com/java3y/austin/handler/shield/impl/ShieldServiceImpl.java index 782697b..a44bfaf 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/shield/impl/ShieldServiceImpl.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/shield/impl/ShieldServiceImpl.java @@ -47,13 +47,13 @@ public class ShieldServiceImpl implements ShieldService { if (isNight()) { if (ShieldType.NIGHT_SHIELD.getCode().equals(taskInfo.getShieldType())) { logUtils.print(AnchorInfo.builder().state(AnchorState.NIGHT_SHIELD.getCode()) - .businessId(taskInfo.getBusinessId()).ids(taskInfo.getReceiver()).build()); + .bizId(taskInfo.getBizId()).messageId(taskInfo.getMessageId()).businessId(taskInfo.getBusinessId()).ids(taskInfo.getReceiver()).build()); } if (ShieldType.NIGHT_SHIELD_BUT_NEXT_DAY_SEND.getCode().equals(taskInfo.getShieldType())) { redisUtils.lPush(NIGHT_SHIELD_BUT_NEXT_DAY_SEND_KEY, JSON.toJSONString(taskInfo, SerializerFeature.WriteClassName), SECONDS_OF_A_DAY); - logUtils.print(AnchorInfo.builder().state(AnchorState.NIGHT_SHIELD_NEXT_SEND.getCode()).businessId(taskInfo.getBusinessId()).ids(taskInfo.getReceiver()).build()); + logUtils.print(AnchorInfo.builder().state(AnchorState.NIGHT_SHIELD_NEXT_SEND.getCode()).bizId(taskInfo.getBizId()).messageId(taskInfo.getMessageId()).businessId(taskInfo.getBusinessId()).ids(taskInfo.getReceiver()).build()); } taskInfo.setReceiver(new HashSet<>()); } diff --git a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/AfterParamCheckAction.java b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/AfterParamCheckAction.java index 5bc4430..7743b6e 100644 --- a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/AfterParamCheckAction.java +++ b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/AfterParamCheckAction.java @@ -4,6 +4,7 @@ package com.java3y.austin.service.api.impl.action; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.util.ReUtil; import com.alibaba.fastjson.JSON; +import com.java3y.austin.common.domain.SimpleTaskInfo; import com.java3y.austin.common.domain.TaskInfo; import com.java3y.austin.common.enums.IdType; import com.java3y.austin.common.enums.RespStatusEnum; @@ -49,8 +50,11 @@ public class AfterParamCheckAction implements BusinessProcess { filterIllegalReceiver(taskInfo); if (CollUtil.isEmpty(taskInfo)) { - context.setNeedBreak(true).setResponse(BasicResultVO.fail(RespStatusEnum.CLIENT_BAD_PARAMETERS)); + context.setNeedBreak(true).setResponse(BasicResultVO.fail(RespStatusEnum.CLIENT_BAD_PARAMETERS, "手机号或邮箱不合法, 无有效的发送任务")); } + + // 数据组装 + context.setResponse(BasicResultVO.success(taskInfo.stream().map(v -> SimpleTaskInfo.builder().businessId(v.getBusinessId()).messageId(v.getMessageId()).bizId(v.getBizId()).build()).collect(Collectors.toList()))); } /** diff --git a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/AssembleAction.java b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/AssembleAction.java index 7360002..831e11a 100644 --- a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/AssembleAction.java +++ b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/AssembleAction.java @@ -79,6 +79,8 @@ public class AssembleAction implements BusinessProcess { for (MessageParam messageParam : messageParamList) { TaskInfo taskInfo = TaskInfo.builder() + .messageId(TaskInfoUtils.generateMessageId()) + .bizId(messageParam.getBizId()) .messageTemplateId(messageTemplate.getId()) .businessId(TaskInfoUtils.generateBusinessId(messageTemplate.getId(), messageTemplate.getTemplateType())) .receiver(new HashSet<>(Arrays.asList(messageParam.getReceiver().split(String.valueOf(StrUtil.C_COMMA))))) @@ -90,6 +92,10 @@ public class AssembleAction implements BusinessProcess { .sendAccount(messageTemplate.getSendAccount()) .contentModel(getContentModelValue(messageTemplate, messageParam)).build(); + if (StrUtil.isBlank(taskInfo.getBizId())) { + taskInfo.setBizId(taskInfo.getMessageId()); + } + taskInfoList.add(taskInfo); } diff --git a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/PreParamCheckAction.java b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/PreParamCheckAction.java index 671748f..4ec8ad1 100644 --- a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/PreParamCheckAction.java +++ b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/PreParamCheckAction.java @@ -34,7 +34,7 @@ public class PreParamCheckAction implements BusinessProcess { // 1.没有传入 消息模板Id 或者 messageParam if (Objects.isNull(messageTemplateId) || CollUtil.isEmpty(messageParamList)) { - context.setNeedBreak(true).setResponse(BasicResultVO.fail(RespStatusEnum.CLIENT_BAD_PARAMETERS)); + context.setNeedBreak(true).setResponse(BasicResultVO.fail(RespStatusEnum.CLIENT_BAD_PARAMETERS, "模板ID或参数列表为空")); return; } @@ -43,7 +43,7 @@ public class PreParamCheckAction implements BusinessProcess { .filter(messageParam -> !StrUtil.isBlank(messageParam.getReceiver())) .collect(Collectors.toList()); if (CollUtil.isEmpty(resultMessageParamList)) { - context.setNeedBreak(true).setResponse(BasicResultVO.fail(RespStatusEnum.CLIENT_BAD_PARAMETERS)); + context.setNeedBreak(true).setResponse(BasicResultVO.fail(RespStatusEnum.CLIENT_BAD_PARAMETERS, "含接受者的参数列表为空")); return; } diff --git a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/service/RecallServiceImpl.java b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/service/RecallServiceImpl.java index 33092b4..058413f 100644 --- a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/service/RecallServiceImpl.java +++ b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/service/RecallServiceImpl.java @@ -27,7 +27,7 @@ public class RecallServiceImpl implements RecallService { @Override public SendResponse recall(SendRequest sendRequest) { if(ObjectUtils.isEmpty(sendRequest)){ - return new SendResponse(RespStatusEnum.CLIENT_BAD_PARAMETERS.getCode(), RespStatusEnum.CLIENT_BAD_PARAMETERS.getMsg()); + return new SendResponse(RespStatusEnum.CLIENT_BAD_PARAMETERS.getCode(), RespStatusEnum.CLIENT_BAD_PARAMETERS.getMsg(), null); } SendTaskModel sendTaskModel = SendTaskModel.builder() .messageTemplateId(sendRequest.getMessageTemplateId()) @@ -38,6 +38,6 @@ public class RecallServiceImpl implements RecallService { .needBreak(false) .response(BasicResultVO.success()).build(); ProcessContext process = processController.process(context); - return new SendResponse(process.getResponse().getStatus(), process.getResponse().getMsg()); + return new SendResponse(process.getResponse().getStatus(), process.getResponse().getMsg(), null); } } diff --git a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/service/SendServiceImpl.java b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/service/SendServiceImpl.java index 815c521..f5b1325 100644 --- a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/service/SendServiceImpl.java +++ b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/service/SendServiceImpl.java @@ -1,6 +1,7 @@ package com.java3y.austin.service.api.impl.service; import cn.monitor4all.logRecord.annotation.OperationLog; +import com.java3y.austin.common.domain.SimpleTaskInfo; import com.java3y.austin.common.enums.RespStatusEnum; import com.java3y.austin.common.vo.BasicResultVO; import com.java3y.austin.service.api.domain.BatchSendRequest; @@ -15,6 +16,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.Collections; +import java.util.List; /** * 发送接口 @@ -31,7 +33,7 @@ public class SendServiceImpl implements SendService { @OperationLog(bizType = "SendService#send", bizId = "#sendRequest.messageTemplateId", msg = "#sendRequest") public SendResponse send(SendRequest sendRequest) { if(ObjectUtils.isEmpty(sendRequest)){ - return new SendResponse(RespStatusEnum.CLIENT_BAD_PARAMETERS.getCode(), RespStatusEnum.CLIENT_BAD_PARAMETERS.getMsg()); + return new SendResponse(RespStatusEnum.CLIENT_BAD_PARAMETERS.getCode(), RespStatusEnum.CLIENT_BAD_PARAMETERS.getMsg(), null); } SendTaskModel sendTaskModel = SendTaskModel.builder() @@ -47,7 +49,7 @@ public class SendServiceImpl implements SendService { ProcessContext process = processController.process(context); - return new SendResponse(process.getResponse().getStatus(), process.getResponse().getMsg()); + return new SendResponse(process.getResponse().getStatus(), process.getResponse().getMsg(), (List) process.getResponse().getData()); } @Override @@ -66,7 +68,7 @@ public class SendServiceImpl implements SendService { ProcessContext process = processController.process(context); - return new SendResponse(process.getResponse().getStatus(), process.getResponse().getMsg()); + return new SendResponse(process.getResponse().getStatus(), process.getResponse().getMsg(), null); } diff --git a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/service/TraceServiceImpl.java b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/service/TraceServiceImpl.java new file mode 100644 index 0000000..246d891 --- /dev/null +++ b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/service/TraceServiceImpl.java @@ -0,0 +1,49 @@ +package com.java3y.austin.service.api.impl.service; + +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.util.StrUtil; +import com.alibaba.fastjson.JSON; +import com.java3y.austin.common.constant.AustinConstant; +import com.java3y.austin.common.domain.SimpleAnchorInfo; +import com.java3y.austin.common.enums.RespStatusEnum; +import com.java3y.austin.service.api.domain.SendResponse; +import com.java3y.austin.service.api.domain.TraceResponse; +import com.java3y.austin.service.api.service.TraceService; +import com.java3y.austin.support.utils.RedisUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Primary; +import org.springframework.stereotype.Service; + +import java.util.List; +import java.util.stream.Collectors; + +/** + * @Author: sky + * @Date: 2023/7/13 13:45 + * @Description: TraceServiceImpl + * @Version 1.0.0 + */ +@Service +@Primary +public class TraceServiceImpl implements TraceService { + + @Autowired + private RedisUtils redisUtils; + + @Override + public TraceResponse traceByMessageId(String messageId) { + if (StrUtil.isBlank(messageId)) { + return new TraceResponse(RespStatusEnum.CLIENT_BAD_PARAMETERS.getCode(), RespStatusEnum.CLIENT_BAD_PARAMETERS.getMsg(), null); + } + String redisMessageKey = StrUtil.join(StrUtil.COLON, AustinConstant.CACHE_KEY_PREFIX, AustinConstant.MESSAGE_ID, messageId); + List messageList = redisUtils.lRange(redisMessageKey, 0, -1); + if (CollUtil.isEmpty(messageList)) { + return new TraceResponse(RespStatusEnum.FAIL.getCode(), RespStatusEnum.FAIL.getMsg(), null); + } + + // 0. 按时间排序 + List sortAnchorList = messageList.stream().map(s -> JSON.parseObject(s, SimpleAnchorInfo.class)).sorted((o1, o2) -> Math.toIntExact(o1.getTimestamp() - o2.getTimestamp())).collect(Collectors.toList()); + + return new TraceResponse(RespStatusEnum.SUCCESS.getCode(), RespStatusEnum.SUCCESS.getMsg(), sortAnchorList); + } +} diff --git a/austin-service-api/src/main/java/com/java3y/austin/service/api/domain/MessageParam.java b/austin-service-api/src/main/java/com/java3y/austin/service/api/domain/MessageParam.java index 5fbb17f..8ff185b 100644 --- a/austin-service-api/src/main/java/com/java3y/austin/service/api/domain/MessageParam.java +++ b/austin-service-api/src/main/java/com/java3y/austin/service/api/domain/MessageParam.java @@ -21,6 +21,11 @@ import java.util.Map; @Builder public class MessageParam { + /** + * 业务消息发送Id, 用于链路追踪, 若不存在, austin 则生成一个消息Id + */ + private String bizId; + /** * @Description: 接收者 * 多个用,逗号号分隔开 diff --git a/austin-service-api/src/main/java/com/java3y/austin/service/api/domain/SendResponse.java b/austin-service-api/src/main/java/com/java3y/austin/service/api/domain/SendResponse.java index 60559ac..7335c62 100644 --- a/austin-service-api/src/main/java/com/java3y/austin/service/api/domain/SendResponse.java +++ b/austin-service-api/src/main/java/com/java3y/austin/service/api/domain/SendResponse.java @@ -1,10 +1,13 @@ package com.java3y.austin.service.api.domain; +import com.java3y.austin.common.domain.SimpleTaskInfo; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import lombok.experimental.Accessors; +import java.util.List; + /** * 发送接口返回值 @@ -20,10 +23,14 @@ public class SendResponse { * 响应状态 */ private String code; - /** * 响应编码 */ private String msg; + /** + * 实际发送任务列表 + */ + private List data; + } diff --git a/austin-service-api/src/main/java/com/java3y/austin/service/api/domain/TraceResponse.java b/austin-service-api/src/main/java/com/java3y/austin/service/api/domain/TraceResponse.java new file mode 100644 index 0000000..cf67172 --- /dev/null +++ b/austin-service-api/src/main/java/com/java3y/austin/service/api/domain/TraceResponse.java @@ -0,0 +1,35 @@ +package com.java3y.austin.service.api.domain; + +import com.java3y.austin.common.domain.SimpleAnchorInfo; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.Accessors; + +import java.util.List; + +/** + * @Author: sky + * @Date: 2023/7/13 13:38 + * @Description: TraceResponse + * @Version 1.0.0 + */ +@Data +@Accessors(chain = true) +@AllArgsConstructor +@NoArgsConstructor +public class TraceResponse { + /** + * 响应状态 + */ + private String code; + /** + * 响应编码 + */ + private String msg; + + /** + * 埋点信息 + */ + private List data; +} diff --git a/austin-service-api/src/main/java/com/java3y/austin/service/api/service/TraceService.java b/austin-service-api/src/main/java/com/java3y/austin/service/api/service/TraceService.java new file mode 100644 index 0000000..5e875bd --- /dev/null +++ b/austin-service-api/src/main/java/com/java3y/austin/service/api/service/TraceService.java @@ -0,0 +1,20 @@ +package com.java3y.austin.service.api.service; + +import com.java3y.austin.service.api.domain.TraceResponse; + +/** + * 链路查询接口 + * @Author: sky + * @Date: 2023/7/13 13:35 + * @Description: TraceService + * @Version 1.0.0 + */ +public interface TraceService { + + /** + * 基于消息 ID 查询 链路结果 + * @param messageId + * @return + */ + TraceResponse traceByMessageId(String messageId); +} diff --git a/austin-stream/src/main/java/com/java3y/austin/stream/sink/AustinSink.java b/austin-stream/src/main/java/com/java3y/austin/stream/sink/AustinSink.java index 126f2b8..23eba17 100644 --- a/austin-stream/src/main/java/com/java3y/austin/stream/sink/AustinSink.java +++ b/austin-stream/src/main/java/com/java3y/austin/stream/sink/AustinSink.java @@ -1,8 +1,10 @@ package com.java3y.austin.stream.sink; import cn.hutool.core.date.DateUtil; +import cn.hutool.core.util.StrUtil; import com.alibaba.fastjson.JSON; import com.google.common.base.Throwables; +import com.java3y.austin.common.constant.AustinConstant; import com.java3y.austin.common.domain.AnchorInfo; import com.java3y.austin.common.domain.SimpleAnchorInfo; import com.java3y.austin.stream.utils.LettuceRedisUtils; @@ -10,6 +12,7 @@ import io.lettuce.core.RedisFuture; import lombok.extern.slf4j.Slf4j; import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import java.time.Duration; import java.util.ArrayList; import java.util.Date; import java.util.List; @@ -40,13 +43,21 @@ public class AustinSink implements SinkFunction { try { LettuceRedisUtils.pipeline(redisAsyncCommands -> { List> redisFutures = new ArrayList<>(); + /** + * 1.构建messageId维度的链路信息 数据结构list:{key,list} + * key:Austin:MessageId:{messageId},listValue:[{timestamp,state,businessId},{timestamp,state,businessId}] + */ + String redisMessageKey = StrUtil.join(StrUtil.COLON, AustinConstant.CACHE_KEY_PREFIX, AustinConstant.MESSAGE_ID, info.getMessageId()); + SimpleAnchorInfo messageAnchorInfo = SimpleAnchorInfo.builder().businessId(info.getBusinessId()).state(info.getState()).timestamp(info.getLogTimestamp()).build(); + redisFutures.add(redisAsyncCommands.lpush(redisMessageKey.getBytes(), JSON.toJSONString(messageAnchorInfo).getBytes())); + redisFutures.add(redisAsyncCommands.expire(redisMessageKey.getBytes(), Duration.ofDays(3).toMillis() / 1000)); /** * 1.构建userId维度的链路信息 数据结构list:{key,list} * key:userId,listValue:[{timestamp,state,businessId},{timestamp,state,businessId}] */ - SimpleAnchorInfo simpleAnchorInfo = SimpleAnchorInfo.builder().businessId(info.getBusinessId()).state(info.getState()).timestamp(info.getLogTimestamp()).build(); + SimpleAnchorInfo userAnchorInfo = SimpleAnchorInfo.builder().businessId(info.getBusinessId()).state(info.getState()).timestamp(info.getLogTimestamp()).build(); for (String id : info.getIds()) { - redisFutures.add(redisAsyncCommands.lpush(id.getBytes(), JSON.toJSONString(simpleAnchorInfo).getBytes())); + redisFutures.add(redisAsyncCommands.lpush(id.getBytes(), JSON.toJSONString(userAnchorInfo).getBytes())); redisFutures.add(redisAsyncCommands.expire(id.getBytes(), (DateUtil.endOfDay(new Date()).getTime() - DateUtil.current()) / 1000)); } diff --git a/austin-support/src/main/java/com/java3y/austin/support/utils/TaskInfoUtils.java b/austin-support/src/main/java/com/java3y/austin/support/utils/TaskInfoUtils.java index f450d77..878b24a 100644 --- a/austin-support/src/main/java/com/java3y/austin/support/utils/TaskInfoUtils.java +++ b/austin-support/src/main/java/com/java3y/austin/support/utils/TaskInfoUtils.java @@ -2,6 +2,7 @@ package com.java3y.austin.support.utils; import cn.hutool.core.date.DatePattern; import cn.hutool.core.date.DateUtil; +import cn.hutool.core.util.IdUtil; import com.java3y.austin.common.constant.CommonConstant; import java.util.Date; @@ -16,6 +17,13 @@ public class TaskInfoUtils { private static final int TYPE_FLAG = 1000000; private static final String CODE = "track_code_bid"; + /** + * 生成任务唯一Id + * @return + */ + public static String generateMessageId() { + return IdUtil.nanoId(); + } /** * 生成BusinessId * 模板类型+模板ID+当天日期 diff --git a/austin-web/src/main/java/com/java3y/austin/web/controller/DataController.java b/austin-web/src/main/java/com/java3y/austin/web/controller/DataController.java index 9cca3c6..618c0fa 100644 --- a/austin-web/src/main/java/com/java3y/austin/web/controller/DataController.java +++ b/austin-web/src/main/java/com/java3y/austin/web/controller/DataController.java @@ -36,6 +36,15 @@ public class DataController { @Autowired private DataService dataService; + @PostMapping("/message") + @ApiOperation("/获取【72小时】发送消息的全链路数据") + public UserTimeLineVo getMessageData(@RequestBody DataParam dataParam) { + if (Objects.isNull(dataParam) || StrUtil.isBlank(dataParam.getMessageId())) { + return UserTimeLineVo.builder().items(new ArrayList<>()).build(); + } + return dataService.getTraceMessageInfo(dataParam.getMessageId()); + } + @PostMapping("/user") @ApiOperation("/获取【当天】用户接收消息的全链路数据") public UserTimeLineVo getUserData(@RequestBody DataParam dataParam) { diff --git a/austin-web/src/main/java/com/java3y/austin/web/service/DataService.java b/austin-web/src/main/java/com/java3y/austin/web/service/DataService.java index 42b09b2..bf91271 100644 --- a/austin-web/src/main/java/com/java3y/austin/web/service/DataService.java +++ b/austin-web/src/main/java/com/java3y/austin/web/service/DataService.java @@ -12,6 +12,14 @@ import com.java3y.austin.web.vo.amis.UserTimeLineVo; */ public interface DataService { + /** + * 获取全链路追踪 消息自身维度信息 + * + * @param messageId 消息 + * @return + */ + UserTimeLineVo getTraceMessageInfo(String messageId); + /** * 获取全链路追踪 用户维度信息 * diff --git a/austin-web/src/main/java/com/java3y/austin/web/service/impl/DataServiceImpl.java b/austin-web/src/main/java/com/java3y/austin/web/service/impl/DataServiceImpl.java index 2ac25e2..4f99809 100644 --- a/austin-web/src/main/java/com/java3y/austin/web/service/impl/DataServiceImpl.java +++ b/austin-web/src/main/java/com/java3y/austin/web/service/impl/DataServiceImpl.java @@ -12,6 +12,8 @@ import com.java3y.austin.common.domain.SimpleAnchorInfo; import com.java3y.austin.common.enums.AnchorState; import com.java3y.austin.common.enums.ChannelType; import com.java3y.austin.common.enums.EnumUtil; +import com.java3y.austin.service.api.domain.TraceResponse; +import com.java3y.austin.service.api.service.TraceService; import com.java3y.austin.support.dao.MessageTemplateDao; import com.java3y.austin.support.dao.SmsRecordDao; import com.java3y.austin.support.domain.MessageTemplate; @@ -47,6 +49,18 @@ public class DataServiceImpl implements DataService { @Autowired private SmsRecordDao smsRecordDao; + @Autowired + private TraceService traceService; + + + @Override + public UserTimeLineVo getTraceMessageInfo(String messageId) { + TraceResponse traceResponse = traceService.traceByMessageId(messageId); + if (CollUtil.isEmpty(traceResponse.getData())) { + return UserTimeLineVo.builder().items(new ArrayList<>()).build(); + } + return buildUserTimeLineVo(traceResponse.getData()); + } @Override public UserTimeLineVo getTraceUserInfo(String receiver) { @@ -57,51 +71,7 @@ public class DataServiceImpl implements DataService { // 0. 按时间排序 List sortAnchorList = userInfoList.stream().map(s -> JSON.parseObject(s, SimpleAnchorInfo.class)).sorted((o1, o2) -> Math.toIntExact(o1.getTimestamp() - o2.getTimestamp())).collect(Collectors.toList()); - - // 1. 对相同的businessId进行分类 {"businessId":[{businessId,state,timeStamp},{businessId,state,timeStamp}]} - Map> map = MapUtil.newHashMap(); - for (SimpleAnchorInfo simpleAnchorInfo : sortAnchorList) { - List simpleAnchorInfos = map.get(String.valueOf(simpleAnchorInfo.getBusinessId())); - if (CollUtil.isEmpty(simpleAnchorInfos)) { - simpleAnchorInfos = new ArrayList<>(); - } - simpleAnchorInfos.add(simpleAnchorInfo); - map.put(String.valueOf(simpleAnchorInfo.getBusinessId()), simpleAnchorInfos); - } - - // 2. 封装vo 给到前端渲染展示 - List items = new ArrayList<>(); - for (Map.Entry> entry : map.entrySet()) { - Long messageTemplateId = TaskInfoUtils.getMessageTemplateIdFromBusinessId(Long.valueOf(entry.getKey())); - MessageTemplate messageTemplate = messageTemplateDao.findById(messageTemplateId).orElse(null); - if (Objects.isNull(messageTemplate)) { - continue; - } - - StringBuilder sb = new StringBuilder(); - for (SimpleAnchorInfo simpleAnchorInfo : entry.getValue()) { - if (AnchorState.RECEIVE.getCode().equals(simpleAnchorInfo.getState())) { - sb.append(StrPool.CRLF); - } - String startTime = DateUtil.format(new Date(simpleAnchorInfo.getTimestamp()), DatePattern.NORM_DATETIME_PATTERN); - String stateDescription = EnumUtil.getDescriptionByCode(simpleAnchorInfo.getState(), AnchorState.class); - sb.append(startTime).append(StrPool.C_COLON).append(stateDescription).append("==>"); - } - - for (String detail : sb.toString().split(StrPool.CRLF)) { - if (StrUtil.isNotBlank(detail)) { - UserTimeLineVo.ItemsVO itemsVO = UserTimeLineVo.ItemsVO.builder() - .businessId(entry.getKey()) - .sendType(EnumUtil.getEnumByCode(messageTemplate.getSendChannel(), ChannelType.class).getDescription()) - .creator(messageTemplate.getCreator()) - .title(messageTemplate.getName()) - .detail(detail) - .build(); - items.add(itemsVO); - } - } - } - return UserTimeLineVo.builder().items(items).build(); + return buildUserTimeLineVo(sortAnchorList); } @Override @@ -153,4 +123,51 @@ public class DataServiceImpl implements DataService { } return businessId; } + + private UserTimeLineVo buildUserTimeLineVo(List sortAnchorList){ + // 1. 对相同的businessId进行分类 {"businessId":[{businessId,state,timeStamp},{businessId,state,timeStamp}]} + Map> map = MapUtil.newHashMap(); + for (SimpleAnchorInfo simpleAnchorInfo : sortAnchorList) { + List simpleAnchorInfos = map.get(String.valueOf(simpleAnchorInfo.getBusinessId())); + if (CollUtil.isEmpty(simpleAnchorInfos)) { + simpleAnchorInfos = new ArrayList<>(); + } + simpleAnchorInfos.add(simpleAnchorInfo); + map.put(String.valueOf(simpleAnchorInfo.getBusinessId()), simpleAnchorInfos); + } + + // 2. 封装vo 给到前端渲染展示 + List items = new ArrayList<>(); + for (Map.Entry> entry : map.entrySet()) { + Long messageTemplateId = TaskInfoUtils.getMessageTemplateIdFromBusinessId(Long.valueOf(entry.getKey())); + MessageTemplate messageTemplate = messageTemplateDao.findById(messageTemplateId).orElse(null); + if (Objects.isNull(messageTemplate)) { + continue; + } + + StringBuilder sb = new StringBuilder(); + for (SimpleAnchorInfo simpleAnchorInfo : entry.getValue()) { + if (AnchorState.RECEIVE.getCode().equals(simpleAnchorInfo.getState())) { + sb.append(StrPool.CRLF); + } + String startTime = DateUtil.format(new Date(simpleAnchorInfo.getTimestamp()), DatePattern.NORM_DATETIME_PATTERN); + String stateDescription = EnumUtil.getDescriptionByCode(simpleAnchorInfo.getState(), AnchorState.class); + sb.append(startTime).append(StrPool.C_COLON).append(stateDescription).append("==>"); + } + + for (String detail : sb.toString().split(StrPool.CRLF)) { + if (StrUtil.isNotBlank(detail)) { + UserTimeLineVo.ItemsVO itemsVO = UserTimeLineVo.ItemsVO.builder() + .businessId(entry.getKey()) + .sendType(EnumUtil.getEnumByCode(messageTemplate.getSendChannel(), ChannelType.class).getDescription()) + .creator(messageTemplate.getCreator()) + .title(messageTemplate.getName()) + .detail(detail) + .build(); + items.add(itemsVO); + } + } + } + return UserTimeLineVo.builder().items(items).build(); + } } diff --git a/austin-web/src/main/java/com/java3y/austin/web/vo/DataParam.java b/austin-web/src/main/java/com/java3y/austin/web/vo/DataParam.java index ca05462..4f3faa7 100644 --- a/austin-web/src/main/java/com/java3y/austin/web/vo/DataParam.java +++ b/austin-web/src/main/java/com/java3y/austin/web/vo/DataParam.java @@ -17,6 +17,12 @@ import lombok.NoArgsConstructor; @NoArgsConstructor public class DataParam { + + /** + * 查看消息Id的链路信息 + */ + private String messageId; + /** * 查看用户的链路信息 */