pull/4/head
3y 4 years ago
parent 93b6c1746a
commit 4871a8f42a

@ -43,20 +43,13 @@
<artifactId>lombok</artifactId> <artifactId>lombok</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.java3y.austin</groupId> <groupId>com.java3y.austin</groupId>
<artifactId>austin-common</artifactId> <artifactId>austin-support</artifactId>
<version>0.0.1-SNAPSHOT</version> <version>0.0.1-SNAPSHOT</version>
</dependency> </dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
</dependencies> </dependencies>
<build> <build>
@ -71,6 +64,17 @@
<goal>shade</goal> <goal>shade</goal>
</goals> </goals>
<configuration> <configuration>
<filters>
<filter>
<!-- 不要拷贝 META-INF 目录下的签名,否则会引起 SecurityExceptions 。 -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers> <transformers>
<transformer <transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
@ -94,7 +98,16 @@
</configuration> </configuration>
</execution> </execution>
</executions> </executions>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.2.2.RELEASE</version>
</dependency>
</dependencies>
</plugin> </plugin>
</plugins> </plugins>
</build> </build>
</project> </project>

@ -8,6 +8,7 @@ import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.springframework.context.ApplicationContext;
/** /**
* flink * flink
@ -22,9 +23,10 @@ public class AustinBootStrap {
String topicName = "austinTopicV2"; String topicName = "austinTopicV2";
String groupId = "austinTopicV23"; String groupId = "austinTopicV23";
ApplicationContext applicationContext = SpringContextUtils.loadContext("classpath*:austin-spring.xml");
FlinkUtils flinkUtils = applicationContext.getBean(FlinkUtils.class);
KafkaSource<String> kafkaConsumer = flinkUtils.getKafkaConsumer(topicName, groupId);
KafkaSource<String> kafkaConsumer = SpringContextUtils.getBean(FlinkUtils.class)
.getKafkaConsumer(topicName, groupId);
DataStreamSource<String> kafkaSource = env.fromSource(kafkaConsumer, WatermarkStrategy.noWatermarks(), "kafkaSource"); DataStreamSource<String> kafkaSource = env.fromSource(kafkaConsumer, WatermarkStrategy.noWatermarks(), "kafkaSource");
kafkaSource.addSink(new SinkFunction<String>() { kafkaSource.addSink(new SinkFunction<String>() {
@Override @Override
@ -32,6 +34,9 @@ public class AustinBootStrap {
log.error("kafka value:{}", value); log.error("kafka value:{}", value);
} }
}); });
// DataStream<AnchorInfo> stream = envBatchPendingThread // DataStream<AnchorInfo> stream = envBatchPendingThread
// .addSource(new AustinSource()) // .addSource(new AustinSource())
// .name("transactions"); // .name("transactions");

@ -3,20 +3,14 @@ package com.java3y.austin.stream.utils;
import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/** /**
* flink * flink
* *
* @author 3y * @author 3y
*/ */
@Component
public class FlinkUtils { public class FlinkUtils {
@Value("${spring.kafka.bootstrap-servers}")
private String broker;
/** /**
* kafkaConsumer * kafkaConsumer
* @param topicName * @param topicName
@ -25,7 +19,7 @@ public class FlinkUtils {
*/ */
public KafkaSource<String> getKafkaConsumer(String topicName, String groupId) { public KafkaSource<String> getKafkaConsumer(String topicName, String groupId) {
KafkaSource<String> source = KafkaSource.<String>builder() KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(broker) .setBootstrapServers("ip:port")
.setTopics(topicName) .setTopics(topicName)
.setGroupId(groupId) .setGroupId(groupId)
.setStartingOffsets(OffsetsInitializer.earliest()) .setStartingOffsets(OffsetsInitializer.earliest())

@ -1,22 +1,56 @@
package com.java3y.austin.stream.utils; package com.java3y.austin.stream.utils;
import org.springframework.beans.BeansException; import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware; import org.springframework.context.support.ClassPathXmlApplicationContext;
import java.util.ArrayList;
import java.util.List;
/** /**
* @author 3y * @author 3y
* @date 2022/2/15 * @date 2022/2/15
* SpringContext * SpringContext
*/ */
public class SpringContextUtils implements ApplicationContextAware { @Slf4j
private static ApplicationContext applicationContext; public class SpringContextUtils {
private static ApplicationContext context;
private static List<String> xmlPath = new ArrayList<>();
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { public static ApplicationContext loadContext(String path) {
applicationContext = applicationContext; return loadContext(new String[]{path});
} }
public static synchronized ApplicationContext loadContext(String[] paths) {
if (null != paths && paths.length > 0) {
//筛选新增
List<String> newPaths = new ArrayList<>();
for (String path : paths) {
if (!xmlPath.contains(path)) {
log.info("ApplicationContextFactory add new path {}", path);
newPaths.add(path);
} else {
log.info("ApplicationContextFactory already load path {}", path);
}
}
if (!newPaths.isEmpty()) {
String[] array = new String[newPaths.size()];
for (int i=0; i<newPaths.size(); i++) {
array[i] = newPaths.get(i);
xmlPath.add(newPaths.get(i));
}
if (null == context) {
context = new ClassPathXmlApplicationContext(array);
} else {
context = new ClassPathXmlApplicationContext(array, context);
}
}
}
return context;
}
/** /**
* beanclass * beanclass
@ -25,7 +59,7 @@ public class SpringContextUtils implements ApplicationContextAware {
* @return * @return
*/ */
public static <T> T getBean(Class<T> clazz) { public static <T> T getBean(Class<T> clazz) {
return applicationContext.getBean(clazz); return context.getBean(clazz);
} }
} }

@ -0,0 +1,11 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="flinkUtils" class="com.java3y.austin.stream.utils.FlinkUtils"></bean>
</beans>
Loading…
Cancel
Save