Merge branch 'flink'

pull/4/head
3y 3 years ago
commit 93b6c1746a

@ -0,0 +1,100 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>austin</artifactId>
<groupId>com.java3y.austin</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>austin-stream</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- This dependency is provided, because it should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- Add connector dependencies here. They must be in the default scope (compile). -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>com.java3y.austin</groupId>
<artifactId>austin-common</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.handlers</resource>
</transformer>
<transformer
implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">
<resource>META-INF/spring.factories</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.schemas</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.java3y.austin.stream.AustinBootStrap</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

@ -0,0 +1,44 @@
package com.java3y.austin.stream;
import com.java3y.austin.stream.utils.FlinkUtils;
import com.java3y.austin.stream.utils.SpringContextUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
/**
* flink
*
* @author 3y
*/
@Slf4j
public class AustinBootStrap {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
String topicName = "austinTopicV2";
String groupId = "austinTopicV23";
KafkaSource<String> kafkaConsumer = SpringContextUtils.getBean(FlinkUtils.class)
.getKafkaConsumer(topicName, groupId);
DataStreamSource<String> kafkaSource = env.fromSource(kafkaConsumer, WatermarkStrategy.noWatermarks(), "kafkaSource");
kafkaSource.addSink(new SinkFunction<String>() {
@Override
public void invoke(String value, Context context) throws Exception {
log.error("kafka value:{}", value);
}
});
// DataStream<AnchorInfo> stream = envBatchPendingThread
// .addSource(new AustinSource())
// .name("transactions");
//
// stream.addSink(new AustinSink());
env.execute("AustinBootStrap");
}
}

@ -0,0 +1,17 @@
package com.java3y.austin.stream.sink;
import com.alibaba.fastjson.JSON;
import com.java3y.austin.common.domain.AnchorInfo;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
@Slf4j
public class AustinSink extends RichSinkFunction<AnchorInfo> {
@Override
public void invoke(AnchorInfo value, Context context) throws Exception {
log.error("sink consume value:{}", JSON.toJSONString(value));
}
}

@ -0,0 +1,34 @@
package com.java3y.austin.stream.source;
import com.java3y.austin.common.domain.AnchorInfo;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import java.util.ArrayList;
import java.util.List;
/**
* mock
*
* @author 3y
*/
public class AustinSource extends RichSourceFunction<AnchorInfo> {
@Override
public void run(SourceContext<AnchorInfo> sourceContext) throws Exception {
List<AnchorInfo> anchorInfoList = new ArrayList<>();
for (int i = 0; i < 10; i++) {
anchorInfoList.add(AnchorInfo.builder()
.state(10).businessId(333L)
.timestamp(System.currentTimeMillis()).build());
}
for (AnchorInfo anchorInfo : anchorInfoList) {
sourceContext.collect(anchorInfo);
}
}
@Override
public void cancel() {
}
}

@ -0,0 +1,36 @@
package com.java3y.austin.stream.utils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* flink
*
* @author 3y
*/
@Component
public class FlinkUtils {
@Value("${spring.kafka.bootstrap-servers}")
private String broker;
/**
* kafkaConsumer
* @param topicName
* @param groupId
* @return
*/
public KafkaSource<String> getKafkaConsumer(String topicName, String groupId) {
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(broker)
.setTopics(topicName)
.setGroupId(groupId)
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
return source;
}
}

@ -0,0 +1,31 @@
package com.java3y.austin.stream.utils;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
/**
* @author 3y
* @date 2022/2/15
* SpringContext
*/
public class SpringContextUtils implements ApplicationContextAware {
private static ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
applicationContext = applicationContext;
}
/**
* beanclass
*
* @param clazz
* @return
*/
public static <T> T getBean(Class<T> clazz) {
return applicationContext.getBean(clazz);
}
}

@ -12,6 +12,7 @@
<module>austin-support</module> <module>austin-support</module>
<module>austin-handler</module> <module>austin-handler</module>
<module>austin-cron</module> <module>austin-cron</module>
<module>austin-stream</module>
</modules> </modules>
<parent> <parent>
@ -29,6 +30,13 @@
<properties> <properties>
<java.version>1.8</java.version> <java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.14.3</flink.version>
<target.java.version>1.8</target.java.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<maven.compiler.target>${target.java.version}</maven.compiler.target>
<log4j.version>2.17.1</log4j.version>
</properties> </properties>
<dependencyManagement> <dependencyManagement>
@ -118,6 +126,32 @@
<version>2.3.0</version> <version>2.3.0</version>
</dependency> </dependency>
<!--flink相关依赖-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!--kafkaFlink连接器-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies> </dependencies>
</dependencyManagement> </dependencyManagement>

Loading…
Cancel
Save