diff --git a/austin-cron/src/main/java/com/java3y/austin/cron/xxl/enums/ScheduleTypeEnum.java b/austin-cron/src/main/java/com/java3y/austin/cron/xxl/enums/ScheduleTypeEnum.java index ed73f34..5da2e35 100644 --- a/austin-cron/src/main/java/com/java3y/austin/cron/xxl/enums/ScheduleTypeEnum.java +++ b/austin-cron/src/main/java/com/java3y/austin/cron/xxl/enums/ScheduleTypeEnum.java @@ -2,6 +2,7 @@ package com.java3y.austin.cron.xxl.enums; /** * 调度类型 + * * @author 3y */ public enum ScheduleTypeEnum { diff --git a/austin-stream/src/main/java/com/java3y/austin/stream/domain/SimpleAnchorInfo.java b/austin-stream/src/main/java/com/java3y/austin/stream/domain/SimpleAnchorInfo.java index 63a5a1b..ce51d4b 100644 --- a/austin-stream/src/main/java/com/java3y/austin/stream/domain/SimpleAnchorInfo.java +++ b/austin-stream/src/main/java/com/java3y/austin/stream/domain/SimpleAnchorInfo.java @@ -7,6 +7,7 @@ import lombok.NoArgsConstructor; /** * 简单的埋点信息 + * * @author 3y */ @Data diff --git a/austin-stream/src/main/java/com/java3y/austin/stream/sink/AustinSink.java b/austin-stream/src/main/java/com/java3y/austin/stream/sink/AustinSink.java index 9c19aae..ac19156 100644 --- a/austin-stream/src/main/java/com/java3y/austin/stream/sink/AustinSink.java +++ b/austin-stream/src/main/java/com/java3y/austin/stream/sink/AustinSink.java @@ -22,20 +22,8 @@ import java.util.List; @Slf4j public class AustinSink implements SinkFunction { - /** - * batch的超时时间和大小 - */ - private static final Integer BATCH_SIZE = 10; - private static final Long TIME_OUT = 2000L; - - /** - * 用ThreadLocal来暂存数据做批量处理 - */ - private static ThreadLocal> baseAnchors = ThreadLocal.withInitial(() -> new ArrayList<>(BATCH_SIZE)); - private static ThreadLocal lastClearTime = ThreadLocal.withInitial(() -> new Long(System.currentTimeMillis())); - @Override - public void invoke(AnchorInfo anchorInfo) throws Exception { + public void invoke(AnchorInfo anchorInfo, Context context) throws Exception { realTimeData(anchorInfo); offlineDate(anchorInfo); } @@ -46,43 +34,34 @@ public class AustinSink implements SinkFunction { * 1.用户维度(查看用户当天收到消息的链路详情),数量级大,只保留当天 * 2.消息模板维度(查看消息模板整体下发情况),数量级小,保留30天 * - * @param anchorInfo + * @param info */ - private void realTimeData(AnchorInfo anchorInfo) { - baseAnchors.get().add(anchorInfo); - if (baseAnchors.get().size() >= BATCH_SIZE || System.currentTimeMillis() - lastClearTime.get() >= TIME_OUT) { - - try { - LettuceRedisUtils.pipeline(redisAsyncCommands -> { - List> redisFutures = new ArrayList<>(); - for (AnchorInfo info : baseAnchors.get()) { - /** - * 1.构建userId维度的链路信息 数据结构list:{key,list} - * key:userId,listValue:[{timestamp,state,businessId},{timestamp,state,businessId}] - */ - SimpleAnchorInfo simpleAnchorInfo = SimpleAnchorInfo.builder().businessId(info.getBusinessId()).state(info.getState()).timestamp(info.getTimestamp()).build(); - for (String id : info.getIds()) { - redisFutures.add(redisAsyncCommands.lpush(id.getBytes(), JSON.toJSONString(simpleAnchorInfo).getBytes())); - redisFutures.add(redisAsyncCommands.expire(id.getBytes(), (DateUtil.endOfDay(new Date()).getTime() - DateUtil.current()) / 1000)); - } + private void realTimeData(AnchorInfo info) { + try { + LettuceRedisUtils.pipeline(redisAsyncCommands -> { + List> redisFutures = new ArrayList<>(); + /** + * 1.构建userId维度的链路信息 数据结构list:{key,list} + * key:userId,listValue:[{timestamp,state,businessId},{timestamp,state,businessId}] + */ + SimpleAnchorInfo simpleAnchorInfo = SimpleAnchorInfo.builder().businessId(info.getBusinessId()).state(info.getState()).timestamp(info.getTimestamp()).build(); + for (String id : info.getIds()) { + redisFutures.add(redisAsyncCommands.lpush(id.getBytes(), JSON.toJSONString(simpleAnchorInfo).getBytes())); + redisFutures.add(redisAsyncCommands.expire(id.getBytes(), (DateUtil.endOfDay(new Date()).getTime() - DateUtil.current()) / 1000)); + } - /** - * 2.构建消息模板维度的链路信息 数据结构hash:{key,hash} - * key:businessId,hashValue:{state,stateCount} - */ - redisFutures.add(redisAsyncCommands.hincrby(String.valueOf(info.getBusinessId()).getBytes(), - String.valueOf(info.getState()).getBytes(), info.getIds().size())); - redisFutures.add(redisAsyncCommands.expire(String.valueOf(info.getBusinessId()).getBytes(), DateUtil.offsetDay(new Date(), 30).getTime())); - } - return redisFutures; - }); + /** + * 2.构建消息模板维度的链路信息 数据结构hash:{key,hash} + * key:businessId,hashValue:{state,stateCount} + */ + redisFutures.add(redisAsyncCommands.hincrby(String.valueOf(info.getBusinessId()).getBytes(), + String.valueOf(info.getState()).getBytes(), info.getIds().size())); + redisFutures.add(redisAsyncCommands.expire(String.valueOf(info.getBusinessId()).getBytes(), DateUtil.offsetDay(new Date(), 30).getTime())); + return redisFutures; + }); - } catch (Exception e) { - log.error("AustinSink#invoke error: {}", Throwables.getStackTraceAsString(e)); - } finally { - lastClearTime.set(System.currentTimeMillis()); - baseAnchors.get().clear(); - } + } catch (Exception e) { + log.error("AustinSink#invoke error: {}", Throwables.getStackTraceAsString(e)); } } diff --git a/austin-support/src/main/java/com/java3y/austin/support/utils/RedisUtils.java b/austin-support/src/main/java/com/java3y/austin/support/utils/RedisUtils.java index 57fba4e..8247cae 100644 --- a/austin-support/src/main/java/com/java3y/austin/support/utils/RedisUtils.java +++ b/austin-support/src/main/java/com/java3y/austin/support/utils/RedisUtils.java @@ -39,11 +39,40 @@ public class RedisUtils { } } } catch (Exception e) { - log.error("redis mGet fail! e:{}", Throwables.getStackTraceAsString(e)); + log.error("RedisUtils#mGet fail! e:{}", Throwables.getStackTraceAsString(e)); } return result; } + /** + * hGetAll + * + * @param key + */ + public Map hGetAll(String key) { + try { + Map entries = redisTemplate.opsForHash().entries(key); + return entries; + } catch (Exception e) { + log.error("RedisUtils#hGetAll fail! e:{}", Throwables.getStackTraceAsString(e)); + } + return null; + } + + /** + * lRange + * + * @param key + */ + public List lRange(String key, long start, long end) { + try { + return redisTemplate.opsForList().range(key, start, end); + } catch (Exception e) { + log.error("RedisUtils#lRange fail! e:{}", Throwables.getStackTraceAsString(e)); + } + return null; + } + /** * pipeline 设置 key-value 并设置过期时间 */ @@ -57,7 +86,7 @@ public class RedisUtils { return null; }); } catch (Exception e) { - log.error("redis pipelineSetEX fail! e:{}", Throwables.getStackTraceAsString(e)); + log.error("RedisUtils#pipelineSetEx fail! e:{}", Throwables.getStackTraceAsString(e)); } } @@ -67,7 +96,7 @@ public class RedisUtils { * @param seconds 过期时间 * @param delta 自增的步长 */ - public void pipelineHashIncrByEX(Map keyValues, Long seconds, Long delta) { + public void pipelineHashIncrByEx(Map keyValues, Long seconds, Long delta) { try { redisTemplate.executePipelined((RedisCallback) connection -> { for (Map.Entry entry : keyValues.entrySet()) { diff --git a/austin-web/src/main/java/com/java3y/austin/web/controller/DataController.java b/austin-web/src/main/java/com/java3y/austin/web/controller/DataController.java index 9855f5b..a85acbc 100644 --- a/austin-web/src/main/java/com/java3y/austin/web/controller/DataController.java +++ b/austin-web/src/main/java/com/java3y/austin/web/controller/DataController.java @@ -1,12 +1,17 @@ package com.java3y.austin.web.controller; +import com.alibaba.fastjson.JSON; import com.java3y.austin.common.vo.BasicResultVO; -import com.java3y.austin.support.domain.MessageTemplate; +import com.java3y.austin.support.utils.RedisUtils; +import com.java3y.austin.web.vo.DataParam; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; +import java.util.Map; + /** * 获取数据接口(全链路追踪) * @@ -14,20 +19,22 @@ import org.springframework.web.bind.annotation.*; */ @Slf4j @RestController -@RequestMapping("/messageTemplate") +@RequestMapping("/trace") @Api("获取数据接口(全链路追踪)") @CrossOrigin(origins = "http://localhost:3000", allowCredentials = "true", allowedHeaders = "*") public class DataController { - /** - * 如果Id存在,则修改 - * 如果Id不存在,则保存 - */ + @Autowired + private RedisUtils redisUtils; + @PostMapping("/data") @ApiOperation("/获取数据") - public BasicResultVO getData(@RequestBody MessageTemplate messageTemplate) { + public BasicResultVO getData(@RequestBody DataParam dataParam) { + Long businessId = dataParam.getBusinessId(); + Map objectObjectMap = redisUtils.hGetAll(String.valueOf(businessId)); + log.info("data:{}", JSON.toJSONString(objectObjectMap)); return BasicResultVO.success(); } diff --git a/austin-web/src/main/java/com/java3y/austin/web/service/DataService.java b/austin-web/src/main/java/com/java3y/austin/web/service/DataService.java new file mode 100644 index 0000000..bba2d9a --- /dev/null +++ b/austin-web/src/main/java/com/java3y/austin/web/service/DataService.java @@ -0,0 +1,10 @@ +package com.java3y.austin.web.service; + +/** + * 数据链路追踪获取接口 + * @author 3y + */ +public interface DataService { + + +}