austin-steam模块(flink)

1、去除spring环境
2、封装redis工具类(无spring环境)
3、编写实时模块(数据多维度进redis)
pull/6/head
3y 3 years ago
parent e3fe1e12c7
commit f196f07d38

@ -44,7 +44,6 @@
<version>0.0.1-SNAPSHOT</version> <version>0.0.1-SNAPSHOT</version>
</dependency> </dependency>
</dependencies> </dependencies>
<build> <build>
<plugins> <plugins>

@ -1,14 +1,16 @@
package com.java3y.austin.stream; package com.java3y.austin.stream;
import com.java3y.austin.common.domain.AnchorInfo;
import com.java3y.austin.stream.constants.AustinFlinkConstant; import com.java3y.austin.stream.constants.AustinFlinkConstant;
import com.java3y.austin.stream.utils.FlinkUtils; import com.java3y.austin.stream.function.AustinFlatMapFunction;
import com.java3y.austin.stream.utils.SpringContextUtils; import com.java3y.austin.stream.sink.AustinSink;
import com.java3y.austin.stream.utils.MessageQueueUtils;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
/** /**
* flink * flink
@ -20,29 +22,25 @@ public class AustinBootStrap {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
SpringContextUtils.loadContext(AustinFlinkConstant.SPRING_CONFIG_PATH);
/** /**
* 1.KafkaConsumer * 1.KafkaConsumer
*/ */
KafkaSource<String> kafkaConsumer = SpringContextUtils.getBean(FlinkUtils.class).getKafkaConsumer(AustinFlinkConstant.TOPIC_NAME, AustinFlinkConstant.GROUP_ID, AustinFlinkConstant.BROKER); KafkaSource<String> kafkaConsumer = MessageQueueUtils.getKafkaConsumer(AustinFlinkConstant.TOPIC_NAME, AustinFlinkConstant.GROUP_ID, AustinFlinkConstant.BROKER);
DataStreamSource<String> kafkaSource = env.fromSource(kafkaConsumer, WatermarkStrategy.noWatermarks(), AustinFlinkConstant.SOURCE_NAME); DataStreamSource<String> kafkaSource = env.fromSource(kafkaConsumer, WatermarkStrategy.noWatermarks(), AustinFlinkConstant.SOURCE_NAME);
/** /**
* 2. * 2.
*/ */
SingleOutputStreamOperator<AnchorInfo> dataStream = kafkaSource.flatMap(new AustinFlatMapFunction()).name(AustinFlinkConstant.FUNCTION_NAME);
/** /**
* 3. Redis()线hive() * 3. Redis()线hive()
*/ */
kafkaSource.addSink(new SinkFunction<String>() { dataStream.addSink(new AustinSink()).name(AustinFlinkConstant.SINK_NAME);
@Override env.execute(AustinFlinkConstant.JOB_NAME);
public void invoke(String value, Context context) throws Exception {
log.error("kafka value:{}", value);
}
});
env.execute("AustinBootStrap");
} }
} }

@ -0,0 +1,23 @@
package com.java3y.austin.stream.callback;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.api.async.RedisAsyncCommands;
import java.util.List;
/**
* redis pipeline
*
* @author 3y
*/
public interface RedisPipelineCallBack {
/**
*
*
* @param redisAsyncCommands
* @return
*/
List<RedisFuture<?>> invoke(RedisAsyncCommands redisAsyncCommands);
}

@ -1,20 +1,26 @@
package com.java3y.austin.stream.constants; package com.java3y.austin.stream.constants;
/**
* Flink
* @author 3y
*/
public class AustinFlinkConstant { public class AustinFlinkConstant {
/** /**
* Kafka * Kafka
* TODO 使broker * TODO 使kafka broker ip:port
*/ */
public static final String GROUP_ID = "austinLogGroup"; public static final String GROUP_ID = "austinLogGroup";
public static final String TOPIC_NAME = "austinLog"; public static final String TOPIC_NAME = "austinLog";
public static final String BROKER = "ip:port"; public static final String BROKER = "ip:port";
/** /**
* spring * redis
* TODO 使redis ip:port
*/ */
public static final String SPRING_CONFIG_PATH = "classpath*:austin-spring.xml"; public static final String REDIS_IP = "ip";
public static final String REDIS_PORT = "port";
public static final String REDIS_PASSWORD = "austin";
/** /**
@ -23,5 +29,7 @@ public class AustinFlinkConstant {
public static final String SOURCE_NAME = "austin_kafka_source"; public static final String SOURCE_NAME = "austin_kafka_source";
public static final String FUNCTION_NAME = "austin_transfer"; public static final String FUNCTION_NAME = "austin_transfer";
public static final String SINK_NAME = "austin_sink"; public static final String SINK_NAME = "austin_sink";
public static final String JOB_NAME = "AustinBootStrap";
} }

@ -0,0 +1,34 @@
package com.java3y.austin.stream.domain;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
*
* @author 3y
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class SimpleAnchorInfo {
/**
*
*/
private int state;
/**
* Id(使)
* TaskInfoUtils
*/
private Long businessId;
/**
*
*/
private long timestamp;
}

@ -0,0 +1,20 @@
package com.java3y.austin.stream.function;
import com.alibaba.fastjson.JSON;
import com.java3y.austin.common.domain.AnchorInfo;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;
/**
* @author 3y
* @date 2022/2/22
* process
*/
public class AustinFlatMapFunction implements FlatMapFunction<String, AnchorInfo> {
@Override
public void flatMap(String value, Collector<AnchorInfo> collector) throws Exception {
AnchorInfo anchorInfo = JSON.parseObject(value, AnchorInfo.class);
collector.collect(anchorInfo);
}
}

@ -1,20 +1,99 @@
package com.java3y.austin.stream.sink; package com.java3y.austin.stream.sink;
import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.google.common.base.Throwables;
import com.java3y.austin.common.domain.AnchorInfo; import com.java3y.austin.common.domain.AnchorInfo;
import com.java3y.austin.stream.domain.SimpleAnchorInfo;
import com.java3y.austin.stream.utils.LettuceRedisUtils;
import io.lettuce.core.RedisFuture;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
/** /**
* mock * redis/hive
*
* @author 3y
*/ */
@Slf4j @Slf4j
public class AustinSink extends RichSinkFunction<AnchorInfo> { public class AustinSink implements SinkFunction<AnchorInfo> {
/**
* batch
*/
private static final Integer BATCH_SIZE = 10;
private static final Long TIME_OUT = 2000L;
/**
* ThreadLocal
*/
private static ThreadLocal<List<AnchorInfo>> baseAnchors = ThreadLocal.withInitial(() -> new ArrayList<>(BATCH_SIZE));
private static ThreadLocal<Long> lastClearTime = ThreadLocal.withInitial(() -> new Long(System.currentTimeMillis()));
@Override @Override
public void invoke(AnchorInfo value, Context context) throws Exception { public void invoke(AnchorInfo anchorInfo) throws Exception {
realTimeData(anchorInfo);
offlineDate(anchorInfo);
}
/**
* Redis
* 1.()
* 2.()30
*
* @param anchorInfo
*/
private void realTimeData(AnchorInfo anchorInfo) {
baseAnchors.get().add(anchorInfo);
if (baseAnchors.get().size() >= BATCH_SIZE || System.currentTimeMillis() - lastClearTime.get() >= TIME_OUT) {
log.error("sink consume value:{}", JSON.toJSONString(value)); try {
LettuceRedisUtils.pipeline(redisAsyncCommands -> {
List<RedisFuture<?>> redisFutures = new ArrayList<>();
for (AnchorInfo info : baseAnchors.get()) {
/**
* 1.userId list:{key,list}
* key:userId,listValue:[{timestamp,state,businessId},{timestamp,state,businessId}]
*/
SimpleAnchorInfo simpleAnchorInfo = SimpleAnchorInfo.builder().businessId(info.getBusinessId()).state(info.getState()).timestamp(info.getTimestamp()).build();
for (String id : info.getIds()) {
redisFutures.add(redisAsyncCommands.lpush(id.getBytes(), JSON.toJSONString(simpleAnchorInfo).getBytes()));
redisFutures.add(redisAsyncCommands.expire(id.getBytes(), (DateUtil.endOfDay(new Date()).getTime() - DateUtil.current()) / 1000));
}
/**
* 2. hash:{key,hash}
* key:businessId,hashValue:{state,stateCount}
*/
redisFutures.add(redisAsyncCommands.hincrby(String.valueOf(info.getBusinessId()).getBytes(),
String.valueOf(info.getState()).getBytes(), info.getIds().size()));
redisFutures.add(redisAsyncCommands.expire(String.valueOf(info.getBusinessId()).getBytes(), DateUtil.offsetDay(new Date(), 30).getTime()));
}
return redisFutures;
});
} catch (Exception e) {
log.error("AustinSink#invoke error: {}", Throwables.getStackTraceAsString(e));
} finally {
lastClearTime.set(System.currentTimeMillis());
baseAnchors.get().clear();
}
}
}
/**
* 线hive
*
* @param anchorInfo
*/
private void offlineDate(AnchorInfo anchorInfo) {
} }
} }

@ -0,0 +1,52 @@
package com.java3y.austin.stream.utils;
import com.java3y.austin.stream.callback.RedisPipelineCallBack;
import com.java3y.austin.stream.constants.AustinFlinkConstant;
import io.lettuce.core.LettuceFutures;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.RedisURI;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.async.RedisAsyncCommands;
import io.lettuce.core.codec.ByteArrayCodec;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* @author 3y
* @date 2022/2/22
* Spring使RedisLettuce
*/
public class LettuceRedisUtils {
/**
* redisClient
*/
private static RedisClient redisClient;
static {
RedisURI redisUri = RedisURI.Builder.redis(AustinFlinkConstant.REDIS_IP)
.withPort(Integer.valueOf(AustinFlinkConstant.REDIS_PORT))
.withPassword(AustinFlinkConstant.REDIS_PASSWORD.toCharArray())
.build();
redisClient = RedisClient.create(redisUri);
}
/**
* pipeline
*/
public static void pipeline(RedisPipelineCallBack pipelineCallBack) {
StatefulRedisConnection<byte[], byte[]> connect = redisClient.connect(new ByteArrayCodec());
RedisAsyncCommands<byte[], byte[]> commands = connect.async();
List<RedisFuture<?>> futures = pipelineCallBack.invoke(commands);
commands.flushCommands();
LettuceFutures.awaitAll(10, TimeUnit.SECONDS,
futures.toArray(new RedisFuture[futures.size()]));
connect.close();
}
}

@ -6,12 +6,12 @@ import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
/** /**
* flink *
* *
* @author 3y * @author 3y
*/ */
@Slf4j @Slf4j
public class FlinkUtils { public class MessageQueueUtils {
/** /**
* kafkaConsumer * kafkaConsumer
* *
@ -19,7 +19,7 @@ public class FlinkUtils {
* @param groupId * @param groupId
* @return * @return
*/ */
public KafkaSource<String> getKafkaConsumer(String topicName, String groupId, String broker) { public static KafkaSource<String> getKafkaConsumer(String topicName, String groupId, String broker) {
KafkaSource<String> source = KafkaSource.<String>builder() KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(broker) .setBootstrapServers(broker)
.setTopics(topicName) .setTopics(topicName)

@ -1,69 +0,0 @@
package com.java3y.austin.stream.utils;
import cn.hutool.core.collection.CollUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import java.util.ArrayList;
import java.util.List;
/**
* @author 3y
* @date 2022/2/15
* SpringContext
*/
@Slf4j
public class SpringContextUtils {
private static ApplicationContext context;
/**
* XML
*/
private static List<String> xmlPath = new ArrayList<>();
public static ApplicationContext loadContext(String path) {
return loadContext(new String[]{path});
}
/**
* spring.xml context
*
* @param paths
* @return
*/
public static synchronized ApplicationContext loadContext(String[] paths) {
if (null != paths && paths.length > 0) {
List<String> newPaths = new ArrayList<>();
for (String path : paths) {
if (!xmlPath.contains(path)) {
newPaths.add(path);
}
}
if (CollUtil.isNotEmpty(newPaths)) {
String[] array = new String[newPaths.size()];
for (int i = 0; i < newPaths.size(); i++) {
array[i] = newPaths.get(i);
xmlPath.add(newPaths.get(i));
}
if (null == context) {
context = new ClassPathXmlApplicationContext(array);
} else {
context = new ClassPathXmlApplicationContext(array, context);
}
}
}
return context;
}
/**
* beanclass
*
* @param clazz
* @return
*/
public static <T> T getBean(Class<T> clazz) {
return context.getBean(clazz);
}
}

@ -1,11 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="flinkUtils" class="com.java3y.austin.stream.utils.FlinkUtils"></bean>
</beans>

@ -60,4 +60,24 @@ public class RedisUtils {
log.error("redis pipelineSetEX fail! e:{}", Throwables.getStackTraceAsString(e)); log.error("redis pipelineSetEX fail! e:{}", Throwables.getStackTraceAsString(e));
} }
} }
/**
* pipeline key-value
*
* @param seconds
* @param delta
*/
public void pipelineHashIncrByEX(Map<String, String> keyValues, Long seconds, Long delta) {
try {
redisTemplate.executePipelined((RedisCallback<String>) connection -> {
for (Map.Entry<String, String> entry : keyValues.entrySet()) {
connection.hIncrBy(entry.getKey().getBytes(), entry.getValue().getBytes(), delta);
connection.expire(entry.getKey().getBytes(), seconds);
}
return null;
});
} catch (Exception e) {
log.error("redis pipelineSetEX fail! e:{}", Throwables.getStackTraceAsString(e));
}
}
} }

@ -0,0 +1,34 @@
package com.java3y.austin.web.controller;
import com.java3y.austin.common.vo.BasicResultVO;
import com.java3y.austin.support.domain.MessageTemplate;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;
/**
* )
*
* @author 3y
*/
@Slf4j
@RestController
@RequestMapping("/messageTemplate")
@Api("获取数据接口(全链路追踪)")
@CrossOrigin(origins = "http://localhost:3000", allowCredentials = "true", allowedHeaders = "*")
public class DataController {
/**
* Id
* Id
*/
@PostMapping("/data")
@ApiOperation("/获取数据")
public BasicResultVO getData(@RequestBody MessageTemplate messageTemplate) {
return BasicResultVO.success();
}
}

@ -0,0 +1,32 @@
package com.java3y.austin.web.vo;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
*
*
* @author 3y
* @date 2022/2/22
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class DataParam {
/**
* userId
*/
private String userId;
/**
* Id(使)
* TaskInfoUtils
*/
private Long businessId;
}
Loading…
Cancel
Save