diff --git a/austin-stream/pom.xml b/austin-stream/pom.xml
index 81f43fc..a04516f 100644
--- a/austin-stream/pom.xml
+++ b/austin-stream/pom.xml
@@ -44,7 +44,6 @@
0.0.1-SNAPSHOT
-
diff --git a/austin-stream/src/main/java/com/java3y/austin/stream/AustinBootStrap.java b/austin-stream/src/main/java/com/java3y/austin/stream/AustinBootStrap.java
index c72c114..dc13cac 100644
--- a/austin-stream/src/main/java/com/java3y/austin/stream/AustinBootStrap.java
+++ b/austin-stream/src/main/java/com/java3y/austin/stream/AustinBootStrap.java
@@ -1,14 +1,16 @@
package com.java3y.austin.stream;
+import com.java3y.austin.common.domain.AnchorInfo;
import com.java3y.austin.stream.constants.AustinFlinkConstant;
-import com.java3y.austin.stream.utils.FlinkUtils;
-import com.java3y.austin.stream.utils.SpringContextUtils;
+import com.java3y.austin.stream.function.AustinFlatMapFunction;
+import com.java3y.austin.stream.sink.AustinSink;
+import com.java3y.austin.stream.utils.MessageQueueUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
/**
* flink启动类
@@ -20,29 +22,25 @@ public class AustinBootStrap {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- SpringContextUtils.loadContext(AustinFlinkConstant.SPRING_CONFIG_PATH);
/**
* 1.获取KafkaConsumer
*/
- KafkaSource kafkaConsumer = SpringContextUtils.getBean(FlinkUtils.class).getKafkaConsumer(AustinFlinkConstant.TOPIC_NAME, AustinFlinkConstant.GROUP_ID, AustinFlinkConstant.BROKER);
+ KafkaSource kafkaConsumer = MessageQueueUtils.getKafkaConsumer(AustinFlinkConstant.TOPIC_NAME, AustinFlinkConstant.GROUP_ID, AustinFlinkConstant.BROKER);
DataStreamSource kafkaSource = env.fromSource(kafkaConsumer, WatermarkStrategy.noWatermarks(), AustinFlinkConstant.SOURCE_NAME);
/**
* 2. 数据转换处理
*/
+ SingleOutputStreamOperator dataStream = kafkaSource.flatMap(new AustinFlatMapFunction()).name(AustinFlinkConstant.FUNCTION_NAME);
/**
* 3. 将实时数据多维度写入Redis(已实现),离线数据写入hive(未实现)
*/
- kafkaSource.addSink(new SinkFunction() {
- @Override
- public void invoke(String value, Context context) throws Exception {
- log.error("kafka value:{}", value);
- }
- });
- env.execute("AustinBootStrap");
+ dataStream.addSink(new AustinSink()).name(AustinFlinkConstant.SINK_NAME);
+ env.execute(AustinFlinkConstant.JOB_NAME);
+
}
}
diff --git a/austin-stream/src/main/java/com/java3y/austin/stream/callback/RedisPipelineCallBack.java b/austin-stream/src/main/java/com/java3y/austin/stream/callback/RedisPipelineCallBack.java
new file mode 100644
index 0000000..3f05bc6
--- /dev/null
+++ b/austin-stream/src/main/java/com/java3y/austin/stream/callback/RedisPipelineCallBack.java
@@ -0,0 +1,23 @@
+package com.java3y.austin.stream.callback;
+
+import io.lettuce.core.RedisFuture;
+import io.lettuce.core.api.async.RedisAsyncCommands;
+
+import java.util.List;
+
+/**
+ * redis pipeline接口定义
+ *
+ * @author 3y
+ */
+public interface RedisPipelineCallBack {
+
+ /**
+ * 具体执行逻辑
+ *
+ * @param redisAsyncCommands
+ * @return
+ */
+ List> invoke(RedisAsyncCommands redisAsyncCommands);
+
+}
diff --git a/austin-stream/src/main/java/com/java3y/austin/stream/constants/AustinFlinkConstant.java b/austin-stream/src/main/java/com/java3y/austin/stream/constants/AustinFlinkConstant.java
index e19af36..75710cd 100644
--- a/austin-stream/src/main/java/com/java3y/austin/stream/constants/AustinFlinkConstant.java
+++ b/austin-stream/src/main/java/com/java3y/austin/stream/constants/AustinFlinkConstant.java
@@ -1,20 +1,26 @@
package com.java3y.austin.stream.constants;
+/**
+ * Flink常量信息
+ * @author 3y
+ */
public class AustinFlinkConstant {
/**
* Kafka 配置信息
- * TODO 使用前需要把broker配置
+ * TODO 使用前配置kafka broker ip:port
*/
public static final String GROUP_ID = "austinLogGroup";
public static final String TOPIC_NAME = "austinLog";
public static final String BROKER = "ip:port";
-
/**
- * spring配置文件路径
+ * redis 配置
+ * TODO 使用前配置redis ip:port
*/
- public static final String SPRING_CONFIG_PATH = "classpath*:austin-spring.xml";
+ public static final String REDIS_IP = "ip";
+ public static final String REDIS_PORT = "port";
+ public static final String REDIS_PASSWORD = "austin";
/**
@@ -23,5 +29,7 @@ public class AustinFlinkConstant {
public static final String SOURCE_NAME = "austin_kafka_source";
public static final String FUNCTION_NAME = "austin_transfer";
public static final String SINK_NAME = "austin_sink";
+ public static final String JOB_NAME = "AustinBootStrap";
+
}
diff --git a/austin-stream/src/main/java/com/java3y/austin/stream/domain/SimpleAnchorInfo.java b/austin-stream/src/main/java/com/java3y/austin/stream/domain/SimpleAnchorInfo.java
new file mode 100644
index 0000000..63a5a1b
--- /dev/null
+++ b/austin-stream/src/main/java/com/java3y/austin/stream/domain/SimpleAnchorInfo.java
@@ -0,0 +1,34 @@
+package com.java3y.austin.stream.domain;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * 简单的埋点信息
+ * @author 3y
+ */
+@Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class SimpleAnchorInfo {
+
+
+ /**
+ * 具体点位
+ */
+ private int state;
+
+ /**
+ * 业务Id(数据追踪使用)
+ * 生成逻辑参考 TaskInfoUtils
+ */
+ private Long businessId;
+
+ /**
+ * 生成时间
+ */
+ private long timestamp;
+}
diff --git a/austin-stream/src/main/java/com/java3y/austin/stream/function/AustinFlatMapFunction.java b/austin-stream/src/main/java/com/java3y/austin/stream/function/AustinFlatMapFunction.java
new file mode 100644
index 0000000..63a742c
--- /dev/null
+++ b/austin-stream/src/main/java/com/java3y/austin/stream/function/AustinFlatMapFunction.java
@@ -0,0 +1,20 @@
+package com.java3y.austin.stream.function;
+
+import com.alibaba.fastjson.JSON;
+import com.java3y.austin.common.domain.AnchorInfo;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.util.Collector;
+
+/**
+ * @author 3y
+ * @date 2022/2/22
+ * process 处理
+ */
+public class AustinFlatMapFunction implements FlatMapFunction {
+
+ @Override
+ public void flatMap(String value, Collector collector) throws Exception {
+ AnchorInfo anchorInfo = JSON.parseObject(value, AnchorInfo.class);
+ collector.collect(anchorInfo);
+ }
+}
diff --git a/austin-stream/src/main/java/com/java3y/austin/stream/sink/AustinSink.java b/austin-stream/src/main/java/com/java3y/austin/stream/sink/AustinSink.java
index 7e0e3b1..9c19aae 100644
--- a/austin-stream/src/main/java/com/java3y/austin/stream/sink/AustinSink.java
+++ b/austin-stream/src/main/java/com/java3y/austin/stream/sink/AustinSink.java
@@ -1,20 +1,99 @@
package com.java3y.austin.stream.sink;
+import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson.JSON;
+import com.google.common.base.Throwables;
import com.java3y.austin.common.domain.AnchorInfo;
+import com.java3y.austin.stream.domain.SimpleAnchorInfo;
+import com.java3y.austin.stream.utils.LettuceRedisUtils;
+import io.lettuce.core.RedisFuture;
import lombok.extern.slf4j.Slf4j;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
/**
- * mock
+ * 消息进 redis/hive
+ *
+ * @author 3y
*/
@Slf4j
-public class AustinSink extends RichSinkFunction {
+public class AustinSink implements SinkFunction {
+
+ /**
+ * batch的超时时间和大小
+ */
+ private static final Integer BATCH_SIZE = 10;
+ private static final Long TIME_OUT = 2000L;
+
+ /**
+ * 用ThreadLocal来暂存数据做批量处理
+ */
+ private static ThreadLocal> baseAnchors = ThreadLocal.withInitial(() -> new ArrayList<>(BATCH_SIZE));
+ private static ThreadLocal lastClearTime = ThreadLocal.withInitial(() -> new Long(System.currentTimeMillis()));
@Override
- public void invoke(AnchorInfo value, Context context) throws Exception {
+ public void invoke(AnchorInfo anchorInfo) throws Exception {
+ realTimeData(anchorInfo);
+ offlineDate(anchorInfo);
+ }
+
+
+ /**
+ * 实时数据存入Redis
+ * 1.用户维度(查看用户当天收到消息的链路详情),数量级大,只保留当天
+ * 2.消息模板维度(查看消息模板整体下发情况),数量级小,保留30天
+ *
+ * @param anchorInfo
+ */
+ private void realTimeData(AnchorInfo anchorInfo) {
+ baseAnchors.get().add(anchorInfo);
+ if (baseAnchors.get().size() >= BATCH_SIZE || System.currentTimeMillis() - lastClearTime.get() >= TIME_OUT) {
- log.error("sink consume value:{}", JSON.toJSONString(value));
+ try {
+ LettuceRedisUtils.pipeline(redisAsyncCommands -> {
+ List> redisFutures = new ArrayList<>();
+ for (AnchorInfo info : baseAnchors.get()) {
+ /**
+ * 1.构建userId维度的链路信息 数据结构list:{key,list}
+ * key:userId,listValue:[{timestamp,state,businessId},{timestamp,state,businessId}]
+ */
+ SimpleAnchorInfo simpleAnchorInfo = SimpleAnchorInfo.builder().businessId(info.getBusinessId()).state(info.getState()).timestamp(info.getTimestamp()).build();
+ for (String id : info.getIds()) {
+ redisFutures.add(redisAsyncCommands.lpush(id.getBytes(), JSON.toJSONString(simpleAnchorInfo).getBytes()));
+ redisFutures.add(redisAsyncCommands.expire(id.getBytes(), (DateUtil.endOfDay(new Date()).getTime() - DateUtil.current()) / 1000));
+ }
+
+ /**
+ * 2.构建消息模板维度的链路信息 数据结构hash:{key,hash}
+ * key:businessId,hashValue:{state,stateCount}
+ */
+ redisFutures.add(redisAsyncCommands.hincrby(String.valueOf(info.getBusinessId()).getBytes(),
+ String.valueOf(info.getState()).getBytes(), info.getIds().size()));
+ redisFutures.add(redisAsyncCommands.expire(String.valueOf(info.getBusinessId()).getBytes(), DateUtil.offsetDay(new Date(), 30).getTime()));
+ }
+ return redisFutures;
+ });
+
+ } catch (Exception e) {
+ log.error("AustinSink#invoke error: {}", Throwables.getStackTraceAsString(e));
+ } finally {
+ lastClearTime.set(System.currentTimeMillis());
+ baseAnchors.get().clear();
+ }
+ }
+ }
+
+ /**
+ * 离线数据存入hive
+ *
+ * @param anchorInfo
+ */
+ private void offlineDate(AnchorInfo anchorInfo) {
}
+
+
}
diff --git a/austin-stream/src/main/java/com/java3y/austin/stream/utils/LettuceRedisUtils.java b/austin-stream/src/main/java/com/java3y/austin/stream/utils/LettuceRedisUtils.java
new file mode 100644
index 0000000..0c64cd2
--- /dev/null
+++ b/austin-stream/src/main/java/com/java3y/austin/stream/utils/LettuceRedisUtils.java
@@ -0,0 +1,52 @@
+package com.java3y.austin.stream.utils;
+
+import com.java3y.austin.stream.callback.RedisPipelineCallBack;
+import com.java3y.austin.stream.constants.AustinFlinkConstant;
+import io.lettuce.core.LettuceFutures;
+import io.lettuce.core.RedisClient;
+import io.lettuce.core.RedisFuture;
+import io.lettuce.core.RedisURI;
+import io.lettuce.core.api.StatefulRedisConnection;
+import io.lettuce.core.api.async.RedisAsyncCommands;
+import io.lettuce.core.codec.ByteArrayCodec;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author 3y
+ * @date 2022/2/22
+ * 无Spring环境下使用Redis,基于Lettuce封装
+ */
+public class LettuceRedisUtils {
+
+ /**
+ * 初始化 redisClient
+ */
+ private static RedisClient redisClient;
+
+ static {
+ RedisURI redisUri = RedisURI.Builder.redis(AustinFlinkConstant.REDIS_IP)
+ .withPort(Integer.valueOf(AustinFlinkConstant.REDIS_PORT))
+ .withPassword(AustinFlinkConstant.REDIS_PASSWORD.toCharArray())
+ .build();
+ redisClient = RedisClient.create(redisUri);
+ }
+
+
+ /**
+ * 封装pipeline操作
+ */
+ public static void pipeline(RedisPipelineCallBack pipelineCallBack) {
+ StatefulRedisConnection connect = redisClient.connect(new ByteArrayCodec());
+ RedisAsyncCommands commands = connect.async();
+
+ List> futures = pipelineCallBack.invoke(commands);
+
+ commands.flushCommands();
+ LettuceFutures.awaitAll(10, TimeUnit.SECONDS,
+ futures.toArray(new RedisFuture[futures.size()]));
+ connect.close();
+ }
+
+}
diff --git a/austin-stream/src/main/java/com/java3y/austin/stream/utils/FlinkUtils.java b/austin-stream/src/main/java/com/java3y/austin/stream/utils/MessageQueueUtils.java
similarity index 83%
rename from austin-stream/src/main/java/com/java3y/austin/stream/utils/FlinkUtils.java
rename to austin-stream/src/main/java/com/java3y/austin/stream/utils/MessageQueueUtils.java
index 76e71c0..d4de674 100644
--- a/austin-stream/src/main/java/com/java3y/austin/stream/utils/FlinkUtils.java
+++ b/austin-stream/src/main/java/com/java3y/austin/stream/utils/MessageQueueUtils.java
@@ -6,12 +6,12 @@ import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
/**
- * flink工具类
+ * 消息队列工具类
*
* @author 3y
*/
@Slf4j
-public class FlinkUtils {
+public class MessageQueueUtils {
/**
* 获取kafkaConsumer
*
@@ -19,7 +19,7 @@ public class FlinkUtils {
* @param groupId
* @return
*/
- public KafkaSource getKafkaConsumer(String topicName, String groupId, String broker) {
+ public static KafkaSource getKafkaConsumer(String topicName, String groupId, String broker) {
KafkaSource source = KafkaSource.builder()
.setBootstrapServers(broker)
.setTopics(topicName)
diff --git a/austin-stream/src/main/java/com/java3y/austin/stream/utils/SpringContextUtils.java b/austin-stream/src/main/java/com/java3y/austin/stream/utils/SpringContextUtils.java
deleted file mode 100644
index 9f26e5c..0000000
--- a/austin-stream/src/main/java/com/java3y/austin/stream/utils/SpringContextUtils.java
+++ /dev/null
@@ -1,69 +0,0 @@
-package com.java3y.austin.stream.utils;
-
-import cn.hutool.core.collection.CollUtil;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.context.ApplicationContext;
-import org.springframework.context.support.ClassPathXmlApplicationContext;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * @author 3y
- * @date 2022/2/15
- * 获取SpringContext对象
- */
-@Slf4j
-public class SpringContextUtils {
- private static ApplicationContext context;
-
- /**
- * XML配置
- */
- private static List xmlPath = new ArrayList<>();
-
- public static ApplicationContext loadContext(String path) {
- return loadContext(new String[]{path});
- }
-
- /**
- * 通过spring.xml文件配置将信息 装载 context
- *
- * @param paths
- * @return
- */
- public static synchronized ApplicationContext loadContext(String[] paths) {
- if (null != paths && paths.length > 0) {
- List newPaths = new ArrayList<>();
- for (String path : paths) {
- if (!xmlPath.contains(path)) {
- newPaths.add(path);
- }
- }
- if (CollUtil.isNotEmpty(newPaths)) {
- String[] array = new String[newPaths.size()];
- for (int i = 0; i < newPaths.size(); i++) {
- array[i] = newPaths.get(i);
- xmlPath.add(newPaths.get(i));
- }
- if (null == context) {
- context = new ClassPathXmlApplicationContext(array);
- } else {
- context = new ClassPathXmlApplicationContext(array, context);
- }
- }
- }
- return context;
- }
-
- /**
- * 根据bean的class来查找对象
- *
- * @param clazz
- * @return
- */
- public static T getBean(Class clazz) {
- return context.getBean(clazz);
- }
-
-}
diff --git a/austin-stream/src/main/resources/austin-spring.xml b/austin-stream/src/main/resources/austin-spring.xml
deleted file mode 100644
index 27c9323..0000000
--- a/austin-stream/src/main/resources/austin-spring.xml
+++ /dev/null
@@ -1,11 +0,0 @@
-
-
-
-
-
-
-
-
-
diff --git a/austin-support/src/main/java/com/java3y/austin/support/utils/RedisUtils.java b/austin-support/src/main/java/com/java3y/austin/support/utils/RedisUtils.java
index cb7f3e1..57fba4e 100644
--- a/austin-support/src/main/java/com/java3y/austin/support/utils/RedisUtils.java
+++ b/austin-support/src/main/java/com/java3y/austin/support/utils/RedisUtils.java
@@ -60,4 +60,24 @@ public class RedisUtils {
log.error("redis pipelineSetEX fail! e:{}", Throwables.getStackTraceAsString(e));
}
}
+
+ /**
+ * pipeline 设置 key-value 并设置过期时间
+ *
+ * @param seconds 过期时间
+ * @param delta 自增的步长
+ */
+ public void pipelineHashIncrByEX(Map keyValues, Long seconds, Long delta) {
+ try {
+ redisTemplate.executePipelined((RedisCallback) connection -> {
+ for (Map.Entry entry : keyValues.entrySet()) {
+ connection.hIncrBy(entry.getKey().getBytes(), entry.getValue().getBytes(), delta);
+ connection.expire(entry.getKey().getBytes(), seconds);
+ }
+ return null;
+ });
+ } catch (Exception e) {
+ log.error("redis pipelineSetEX fail! e:{}", Throwables.getStackTraceAsString(e));
+ }
+ }
}
diff --git a/austin-web/src/main/java/com/java3y/austin/web/controller/DataController.java b/austin-web/src/main/java/com/java3y/austin/web/controller/DataController.java
new file mode 100644
index 0000000..9855f5b
--- /dev/null
+++ b/austin-web/src/main/java/com/java3y/austin/web/controller/DataController.java
@@ -0,0 +1,34 @@
+package com.java3y.austin.web.controller;
+
+import com.java3y.austin.common.vo.BasicResultVO;
+import com.java3y.austin.support.domain.MessageTemplate;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.web.bind.annotation.*;
+
+/**
+ * 获取数据接口(全链路追踪)
+ *
+ * @author 3y
+ */
+@Slf4j
+@RestController
+@RequestMapping("/messageTemplate")
+@Api("获取数据接口(全链路追踪)")
+@CrossOrigin(origins = "http://localhost:3000", allowCredentials = "true", allowedHeaders = "*")
+public class DataController {
+
+ /**
+ * 如果Id存在,则修改
+ * 如果Id不存在,则保存
+ */
+ @PostMapping("/data")
+ @ApiOperation("/获取数据")
+ public BasicResultVO getData(@RequestBody MessageTemplate messageTemplate) {
+
+
+ return BasicResultVO.success();
+ }
+
+}
diff --git a/austin-web/src/main/java/com/java3y/austin/web/vo/DataParam.java b/austin-web/src/main/java/com/java3y/austin/web/vo/DataParam.java
new file mode 100644
index 0000000..ad316bb
--- /dev/null
+++ b/austin-web/src/main/java/com/java3y/austin/web/vo/DataParam.java
@@ -0,0 +1,32 @@
+package com.java3y.austin.web.vo;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * 全链路 请求参数
+ *
+ * @author 3y
+ * @date 2022/2/22
+ */
+@Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class DataParam {
+ /**
+ * 传入userId查看用户的链路信息
+ */
+ private String userId;
+
+
+ /**
+ * 业务Id(数据追踪使用)
+ * 生成逻辑参考 TaskInfoUtils
+ */
+ private Long businessId;
+
+
+}