diff --git a/austin-stream/pom.xml b/austin-stream/pom.xml index d2ae418..be9e53a 100644 --- a/austin-stream/pom.xml +++ b/austin-stream/pom.xml @@ -43,20 +43,13 @@ lombok + com.java3y.austin - austin-common + austin-support 0.0.1-SNAPSHOT - - com.alibaba - fastjson - - - org.springframework.boot - spring-boot-starter - @@ -71,6 +64,17 @@ shade + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + @@ -94,7 +98,16 @@ + + + org.springframework.boot + spring-boot-maven-plugin + 2.2.2.RELEASE + + + + \ No newline at end of file diff --git a/austin-stream/src/main/java/com/java3y/austin/stream/AustinBootStrap.java b/austin-stream/src/main/java/com/java3y/austin/stream/AustinBootStrap.java index 31c8216..85f896b 100644 --- a/austin-stream/src/main/java/com/java3y/austin/stream/AustinBootStrap.java +++ b/austin-stream/src/main/java/com/java3y/austin/stream/AustinBootStrap.java @@ -8,6 +8,7 @@ import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.springframework.context.ApplicationContext; /** * flink启动类 @@ -22,9 +23,10 @@ public class AustinBootStrap { String topicName = "austinTopicV2"; String groupId = "austinTopicV23"; + ApplicationContext applicationContext = SpringContextUtils.loadContext("classpath*:austin-spring.xml"); + FlinkUtils flinkUtils = applicationContext.getBean(FlinkUtils.class); + KafkaSource kafkaConsumer = flinkUtils.getKafkaConsumer(topicName, groupId); - KafkaSource kafkaConsumer = SpringContextUtils.getBean(FlinkUtils.class) - .getKafkaConsumer(topicName, groupId); DataStreamSource kafkaSource = env.fromSource(kafkaConsumer, WatermarkStrategy.noWatermarks(), "kafkaSource"); kafkaSource.addSink(new SinkFunction() { @Override @@ -32,7 +34,10 @@ public class AustinBootStrap { log.error("kafka value:{}", value); } }); -// DataStream stream = envBatchPendingThread + + + + // DataStream stream = envBatchPendingThread // .addSource(new AustinSource()) // .name("transactions"); // diff --git a/austin-stream/src/main/java/com/java3y/austin/stream/utils/FlinkUtils.java b/austin-stream/src/main/java/com/java3y/austin/stream/utils/FlinkUtils.java index a8dcdbf..d605a0c 100644 --- a/austin-stream/src/main/java/com/java3y/austin/stream/utils/FlinkUtils.java +++ b/austin-stream/src/main/java/com/java3y/austin/stream/utils/FlinkUtils.java @@ -3,20 +3,14 @@ package com.java3y.austin.stream.utils; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.stereotype.Component; /** * flink工具类 * * @author 3y */ -@Component public class FlinkUtils { - @Value("${spring.kafka.bootstrap-servers}") - private String broker; - /** * 获取kafkaConsumer * @param topicName @@ -25,7 +19,7 @@ public class FlinkUtils { */ public KafkaSource getKafkaConsumer(String topicName, String groupId) { KafkaSource source = KafkaSource.builder() - .setBootstrapServers(broker) + .setBootstrapServers("ip:port") .setTopics(topicName) .setGroupId(groupId) .setStartingOffsets(OffsetsInitializer.earliest()) diff --git a/austin-stream/src/main/java/com/java3y/austin/stream/utils/SpringContextUtils.java b/austin-stream/src/main/java/com/java3y/austin/stream/utils/SpringContextUtils.java index fae8e4e..0ce863f 100644 --- a/austin-stream/src/main/java/com/java3y/austin/stream/utils/SpringContextUtils.java +++ b/austin-stream/src/main/java/com/java3y/austin/stream/utils/SpringContextUtils.java @@ -1,22 +1,56 @@ package com.java3y.austin.stream.utils; -import org.springframework.beans.BeansException; +import lombok.extern.slf4j.Slf4j; import org.springframework.context.ApplicationContext; -import org.springframework.context.ApplicationContextAware; +import org.springframework.context.support.ClassPathXmlApplicationContext; + +import java.util.ArrayList; +import java.util.List; /** * @author 3y * @date 2022/2/15 * 获取SpringContext对象 */ -public class SpringContextUtils implements ApplicationContextAware { - private static ApplicationContext applicationContext; +@Slf4j +public class SpringContextUtils { + private static ApplicationContext context; + + + private static List xmlPath = new ArrayList<>(); - @Override - public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { - applicationContext = applicationContext; + + public static ApplicationContext loadContext(String path) { + return loadContext(new String[]{path}); } + public static synchronized ApplicationContext loadContext(String[] paths) { + if (null != paths && paths.length > 0) { + //筛选新增 + List newPaths = new ArrayList<>(); + for (String path : paths) { + if (!xmlPath.contains(path)) { + log.info("ApplicationContextFactory add new path {}", path); + newPaths.add(path); + } else { + log.info("ApplicationContextFactory already load path {}", path); + } + } + if (!newPaths.isEmpty()) { + String[] array = new String[newPaths.size()]; + for (int i=0; i T getBean(Class clazz) { - return applicationContext.getBean(clazz); + return context.getBean(clazz); } } diff --git a/austin-stream/src/main/resources/austin-spring.xml b/austin-stream/src/main/resources/austin-spring.xml new file mode 100644 index 0000000..27c9323 --- /dev/null +++ b/austin-stream/src/main/resources/austin-spring.xml @@ -0,0 +1,11 @@ + + + + + + + + +