diff --git a/austin-data-house/pom.xml b/austin-data-house/pom.xml index 30b5381..dba1fab 100644 --- a/austin-data-house/pom.xml +++ b/austin-data-house/pom.xml @@ -23,13 +23,13 @@ org.apache.flink flink-connector-hive_2.12 - 1.16.0 + ${flink.version} provided org.apache.flink flink-table-api-java-bridge - 1.16.0 + ${flink.version} provided diff --git a/austin-stream/pom.xml b/austin-stream/pom.xml index 3bc7137..f995bf6 100644 --- a/austin-stream/pom.xml +++ b/austin-stream/pom.xml @@ -15,26 +15,27 @@ org.apache.flink - flink-walkthrough-common_${scala.binary.version} + flink-walkthrough-common ${flink.version} org.apache.flink - flink-streaming-java_${scala.binary.version} + flink-streaming-java ${flink.version} provided org.apache.flink - flink-clients_${scala.binary.version} + flink-clients ${flink.version} provided + org.apache.flink - flink-connector-kafka_${scala.binary.version} + flink-connector-kafka ${flink.version} @@ -85,7 +86,7 @@ META-INF/spring.schemas + implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> com.java3y.austin.stream.AustinBootStrap diff --git a/austin-stream/src/main/java/com/java3y/austin/stream/constants/AustinFlinkConstant.java b/austin-stream/src/main/java/com/java3y/austin/stream/constants/AustinFlinkConstant.java index 7bf2262..d02afae 100644 --- a/austin-stream/src/main/java/com/java3y/austin/stream/constants/AustinFlinkConstant.java +++ b/austin-stream/src/main/java/com/java3y/austin/stream/constants/AustinFlinkConstant.java @@ -22,10 +22,21 @@ public class AustinFlinkConstant { * TODO 使用前配置redis ip:port * (真实网络ip,这里不能用配置的hosts,看语雀文档得到真实ip) */ + + public static final String REDIS_MODE_SENTINEL = "SENTINEL"; + + public static final String REDIS_MODE_SINGLE = "SINGLE"; + + public static final String REDIS_MODE = REDIS_MODE_SENTINEL; + public static final String REDIS_IP = "austin-redis"; - public static final String REDIS_PORT = "6379"; + + public static final Integer REDIS_PORT = 6379; + public static final String REDIS_PASSWORD = "austin"; + public static final String MASTER_ID = "mymaster"; + /** * Flink流程常量 diff --git a/austin-stream/src/main/java/com/java3y/austin/stream/utils/LettuceRedisUtils.java b/austin-stream/src/main/java/com/java3y/austin/stream/utils/LettuceRedisUtils.java index 0c64cd2..7c9269a 100644 --- a/austin-stream/src/main/java/com/java3y/austin/stream/utils/LettuceRedisUtils.java +++ b/austin-stream/src/main/java/com/java3y/austin/stream/utils/LettuceRedisUtils.java @@ -1,5 +1,6 @@ package com.java3y.austin.stream.utils; +import cn.hutool.core.util.StrUtil; import com.java3y.austin.stream.callback.RedisPipelineCallBack; import com.java3y.austin.stream.constants.AustinFlinkConstant; import io.lettuce.core.LettuceFutures; @@ -26,10 +27,18 @@ public class LettuceRedisUtils { private static RedisClient redisClient; static { - RedisURI redisUri = RedisURI.Builder.redis(AustinFlinkConstant.REDIS_IP) - .withPort(Integer.valueOf(AustinFlinkConstant.REDIS_PORT)) - .withPassword(AustinFlinkConstant.REDIS_PASSWORD.toCharArray()) - .build(); + RedisURI redisUri = null; + if (StrUtil.equals(AustinFlinkConstant.REDIS_MODE_SENTINEL, AustinFlinkConstant.REDIS_MODE)) { + redisUri = RedisURI.Builder.sentinel(AustinFlinkConstant.REDIS_IP, AustinFlinkConstant.MASTER_ID) + .withPassword(AustinFlinkConstant.REDIS_PASSWORD.toCharArray()) + .build(); + } else if (StrUtil.equals(AustinFlinkConstant.REDIS_MODE_SINGLE, AustinFlinkConstant.REDIS_MODE)) { + redisUri = RedisURI.Builder.redis(AustinFlinkConstant.REDIS_IP) + .withPort(Integer.valueOf(AustinFlinkConstant.REDIS_PORT)) + .withPassword(AustinFlinkConstant.REDIS_PASSWORD.toCharArray()) + .build(); + } + redisClient = RedisClient.create(redisUri); } diff --git a/pom.xml b/pom.xml index 7692650..80e2a69 100644 --- a/pom.xml +++ b/pom.xml @@ -32,9 +32,8 @@ 1.8 UTF-8 - 1.14.3 + 1.17.1 1.8 - 2.11 ${target.java.version} ${target.java.version} 2.17.1 @@ -54,7 +53,7 @@ cn.hutool hutool-all - 5.7.15 + 5.8.16 @@ -135,21 +134,28 @@ 2.3.0 + + + org.apache.kafka + kafka-clients + 3.2.3 + + org.apache.flink - flink-walkthrough-common_${scala.binary.version} + flink-walkthrough-common ${flink.version} org.apache.flink - flink-streaming-java_${scala.binary.version} + flink-streaming-java ${flink.version} provided org.apache.flink - flink-clients_${scala.binary.version} + flink-clients ${flink.version} provided @@ -157,7 +163,7 @@ org.apache.flink - flink-connector-kafka_${scala.binary.version} + flink-connector-kafka ${flink.version}