diff --git a/INSTALL.md b/INSTALL.md index 17df58d..4c56b33 100644 --- a/INSTALL.md +++ b/INSTALL.md @@ -187,13 +187,13 @@ docker ps docker exec -it kafka sh ``` -创建两个topic(这里我的**topicName**就叫austinBusiness和austinLog,你们可以改成自己的) +创建两个topic(这里我的**topicName**就叫austinBusiness、austinTraceLog、austinRecall,你们可以改成自己的) ``` $KAFKA_HOME/bin/kafka-topics.sh --create --topic austinBusiness --partitions 1 --zookeeper zookeeper:2181 --replication-factor 1 -$KAFKA_HOME/bin/kafka-topics.sh --create --topic austinLog --partitions 1 --zookeeper zookeeper:2181 --replication-factor 1 +$KAFKA_HOME/bin/kafka-topics.sh --create --topic austinTraceLog --partitions 1 --zookeeper zookeeper:2181 --replication-factor 1 $KAFKA_HOME/bin/kafka-topics.sh --create --topic austinRecall --partitions 1 --zookeeper zookeeper:2181 --replication-factor 1 diff --git a/austin-common/src/main/java/com/java3y/austin/common/domain/AnchorInfo.java b/austin-common/src/main/java/com/java3y/austin/common/domain/AnchorInfo.java index f1f15b5..3790649 100644 --- a/austin-common/src/main/java/com/java3y/austin/common/domain/AnchorInfo.java +++ b/austin-common/src/main/java/com/java3y/austin/common/domain/AnchorInfo.java @@ -35,8 +35,8 @@ public class AnchorInfo { /** - * 生成时间 + * 日志生成时间 */ - private long timestamp; + private long logTimestamp; } diff --git a/austin-data-house/pom.xml b/austin-data-house/pom.xml index 2bd15f7..30b5381 100644 --- a/austin-data-house/pom.xml +++ b/austin-data-house/pom.xml @@ -14,6 +14,7 @@ <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> + <hive-exec.version>2.3.4</hive-exec.version> </properties> @@ -22,47 +23,23 @@ <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-hive_2.12</artifactId> - <version>1.14.3</version> - + <version>1.16.0</version> + <scope>provided</scope> </dependency> - <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-table-api-java-bridge_2.12</artifactId> - <version>1.14.3</version> - + <artifactId>flink-table-api-java-bridge</artifactId> + <version>1.16.0</version> + <scope>provided</scope> </dependency> <!-- Hive Dependency --> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> - <version>3.1.2</version> + <version>${hive-exec.version}</version> + <scope>provided</scope> </dependency> - - <!--hadoop start--> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-mapreduce-client-core</artifactId> - <version>3.3.1</version> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - <version>3.3.1</version> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-mapreduce-client-common</artifactId> - <version>3.3.1</version> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-mapreduce-client-jobclient</artifactId> - <version>3.3.1</version> - </dependency> - <!--hadoop end--> - </dependencies> <build> <plugins> @@ -104,10 +81,10 @@ <resource>META-INF/spring.schemas</resource> </transformer> <transformer - implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /> + implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> - <mainClass>com.java3y.austin.stream.AustinBootStrap</mainClass> + <mainClass>com.java3y.austin.datahouse.AustinHiveBootStrap</mainClass> </transformer> </transformers> </configuration> diff --git a/austin-data-house/src/main/java/com/java3y/austin/datahouse/AustinBootStrap.java b/austin-data-house/src/main/java/com/java3y/austin/datahouse/AustinBootStrap.java deleted file mode 100644 index 428b76e..0000000 --- a/austin-data-house/src/main/java/com/java3y/austin/datahouse/AustinBootStrap.java +++ /dev/null @@ -1,82 +0,0 @@ -package com.java3y.austin.datahouse; - -import org.apache.flink.streaming.api.CheckpointingMode; -import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; -import org.apache.flink.table.api.EnvironmentSettings; -import org.apache.flink.table.api.SqlDialect; -import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.catalog.hive.HiveCatalog; - -import java.time.Duration; - -/** - * flink启动类 - * - * @author 3y - */ -public class AustinBootStrap { - - public static void main(String[] args) throws Exception { - - TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode()); - tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE); - tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(20)); - tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); - - String catalogName = "my_hive"; - - HiveCatalog catalog = new HiveCatalog(catalogName, "austin_hive", null); - - tableEnv.registerCatalog(catalogName, catalog); - tableEnv.useCatalog(catalogName); - - tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS austin"); - tableEnv.executeSql("DROP TABLE IF EXISTS austin.message_anchor_info"); - - tableEnv.executeSql("create table austin.message_anchor_info(" + - "ids String,\n" + - "state String,\n" + - "businessId String,\n" + - "log_ts Timestamp(3),\n" + - "WATERMARK FOR log_ts AS log_ts -INTERVAL '5' SECOND" + - ")WITH(" + - " 'connector' = 'kafka',\n" + - "'topic' = 'austinTraceLog',\n" + - " 'properties.bootstrap.servers' = 'kafka_ip:9092',\n" + - "'properties.group.id' = 'flink1',\n" + - "'scan.startup.mode' = 'earliest-offset',\n" + - "'format' = 'json',\n" + - "'json.fail-on-missing-field' = 'false',\n" + - "'json.ignore-parse-errors' = 'true'" + - ")"); - - -// tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS hive_tmp"); -// tableEnv.executeSql("DROP TABLE IF EXISTS hive_tmp.log_hive"); -// -// tableEnv.executeSql(" CREATE TABLE hive_tmp.log_hive (\n" + -// " user_id STRING,\n" + -// " order_amount DOUBLE\n" + -// " ) PARTITIONED BY (\n" + -// " dt STRING,\n" + -// " hr STRING\n" + -// " ) STORED AS PARQUET\n" + -// " TBLPROPERTIES (\n" + -// " 'sink.partition-commit.trigger' = 'partition-time',\n" + -// " 'sink.partition-commit.delay' = '1 min',\n" + -// " 'format' = 'json',\n" + -// " 'sink.partition-commit.policy.kind' = 'metastore,success-file',\n" + -// " 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'" + -// " )"); -// tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); -// -// tableEnv.executeSql("" + -// " INSERT INTO hive_tmp.log_hive\n" + -// " SELECT\n" + -// " user_id,\n" + -// " order_amount,\n" + -// " DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH')\n" + -// " FROM austin.message_anchor_info"); - - } -} diff --git a/austin-data-house/src/main/java/com/java3y/austin/datahouse/AustinHiveBootStrap.java b/austin-data-house/src/main/java/com/java3y/austin/datahouse/AustinHiveBootStrap.java new file mode 100644 index 0000000..e696bdf --- /dev/null +++ b/austin-data-house/src/main/java/com/java3y/austin/datahouse/AustinHiveBootStrap.java @@ -0,0 +1,80 @@ +package com.java3y.austin.datahouse; + + +import com.java3y.austin.datahouse.constants.DataHouseConstant; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.SqlDialect; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.catalog.hive.HiveCatalog; + +/** + * flink启动类 + * <p> + * 接受Kafka的消息 写入hive表中 + * + * @author 3y + */ +public class AustinHiveBootStrap { + + public static void main(String[] args) { + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1).enableCheckpointing(3000); + StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); + + // 1.初始化catalog + String catalogName = DataHouseConstant.CATALOG_NAME; + HiveCatalog catalog = new HiveCatalog(catalogName, DataHouseConstant.CATALOG_DEFAULT_DATABASE, null); + tableEnv.registerCatalog(catalogName, catalog); + tableEnv.useCatalog(catalogName); + + // 2.创建Kafka源表 + String kafkaSourceTableCreate = "DROP TABLE IF EXISTS " + DataHouseConstant.CATALOG_DEFAULT_DATABASE + "." + DataHouseConstant.KAFKA_SOURCE_TABLE_NAME; + tableEnv.executeSql(kafkaSourceTableCreate); + String kafkaSourceTableDDL = "CREATE TABLE <CATALOG_DEFAULT_DATABASE>.<KAFKA_SOURCE_TABLE_NAME> (\n" + + "`ids` String,\n" + + "`state` String,\n" + + "`businessId` String,\n" + + "`logTimestamp` String\n" + + ") WITH (\n" + + " 'connector' = 'kafka'," + + " 'topic' = '<KAFKA_TOPIC>',\n" + + " 'properties.bootstrap.servers' = '<KAFKA_IP_PORT>',\n" + + " 'properties.group.id' = 'group_test_01',\n" + + " 'format' = 'json',\n" + + " 'json.fail-on-missing-field' = 'true',\n" + + " 'json.ignore-parse-errors' = 'false',\n" + + " 'scan.topic-partition-discovery.interval'='1s',\n" + + " 'scan.startup.mode' = 'latest-offset')"; + kafkaSourceTableDDL = kafkaSourceTableDDL.replace("<CATALOG_DEFAULT_DATABASE>", DataHouseConstant.CATALOG_DEFAULT_DATABASE) + .replace("<KAFKA_SOURCE_TABLE_NAME>", DataHouseConstant.KAFKA_SOURCE_TABLE_NAME) + .replace("<KAFKA_IP_PORT>", DataHouseConstant.KAFKA_IP_PORT) + .replace("<KAFKA_TOPIC>", DataHouseConstant.KAFKA_TOPIC); + + tableEnv.executeSql(kafkaSourceTableDDL); + + // 创建写入hive的表 + tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); + tableEnv.executeSql("DROP TABLE IF EXISTS " + DataHouseConstant.CATALOG_DEFAULT_DATABASE + "." + DataHouseConstant.KAFKA_SINK_TABLE_NAME); + String kafkaSinkTableDDL = "CREATE TABLE IF NOT EXISTS <CATALOG_DEFAULT_DATABASE>.<KAFKA_SINK_TABLE_NAME> (\n" + + "`ids` String,\n" + + "`state` String,\n" + + "`businessId` String,\n" + + "`logTimestamp` String\n" + + ") STORED AS PARQUET\n" + + "TBLPROPERTIES (\n" + + " 'sink.partition-commit.policy.kind' = 'metastore,success-file',\n" + + " 'sink.partition-commit.trigger' = 'partition-time',\n" + + " 'sink.partition-commit.delay' = '1 min',\n" + + " 'sink.buffer-flush.max-rows'='10',\n" + + " 'sink.buffer-flush.interval' = '5s'\n" + + ")"; + kafkaSinkTableDDL = kafkaSinkTableDDL.replace("<CATALOG_DEFAULT_DATABASE>", DataHouseConstant.CATALOG_DEFAULT_DATABASE) + .replace("<KAFKA_SINK_TABLE_NAME>", DataHouseConstant.KAFKA_SINK_TABLE_NAME); + tableEnv.executeSql(kafkaSinkTableDDL); + + // 3. 将kafka_source 数据写入到kafka_sink 完成 + tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); + tableEnv.executeSql("INSERT INTO " + DataHouseConstant.CATALOG_DEFAULT_DATABASE + "." + DataHouseConstant.KAFKA_SINK_TABLE_NAME + " SELECT ids,state,businessId,logTimestamp FROM " + DataHouseConstant.CATALOG_DEFAULT_DATABASE + "." + DataHouseConstant.KAFKA_SOURCE_TABLE_NAME + ""); + + } +} diff --git a/austin-data-house/src/main/java/com/java3y/austin/datahouse/constants/DataHouseConstant.java b/austin-data-house/src/main/java/com/java3y/austin/datahouse/constants/DataHouseConstant.java index 7ae66dd..830de49 100644 --- a/austin-data-house/src/main/java/com/java3y/austin/datahouse/constants/DataHouseConstant.java +++ b/austin-data-house/src/main/java/com/java3y/austin/datahouse/constants/DataHouseConstant.java @@ -7,6 +7,37 @@ package com.java3y.austin.datahouse.constants; */ public class DataHouseConstant { - public static final String HIVE_CONF_ = "austinLogGroup"; + /** + * catalog name + */ + public static final String CATALOG_NAME = "my_hive"; + + /** + * 库名 + */ + public static final String CATALOG_DEFAULT_DATABASE = "austin"; + + /** + * 消费Kafka消息,写入的表 + */ + public static final String KAFKA_SOURCE_TABLE_NAME = "anchor_log_source"; + + /** + * 最终落到hive的表 + */ + public static final String KAFKA_SINK_TABLE_NAME = "message_anchor"; + + /** + * 源Kafka topic + */ + public static final String KAFKA_TOPIC = "austinTraceLog"; + + + /** + * eg: 127.0.0.1:9092 + * 消费Kafka的ip和端口 + */ + public static final String KAFKA_IP_PORT = "127.0.0.1:9092"; + } diff --git a/austin-data-house/src/main/resources/hive-site.xml b/austin-data-house/src/main/resources/hive-site.xml index ef0ff43..7b21093 100644 --- a/austin-data-house/src/main/resources/hive-site.xml +++ b/austin-data-house/src/main/resources/hive-site.xml @@ -6,7 +6,7 @@ <!--指定mysql数据库连接的database--> <property> <name>javax.jdo.option.ConnectionURL</name> - <value>jdbc:postgresql://3y_ip:5432/metastore?createDatabaseIfNotExist=true</value> + <value>jdbc:postgresql://hive_ip:5432/metastore?createDatabaseIfNotExist=true</value> <description>JDBC connect string for a JDBC metastore</description> </property> @@ -30,7 +30,7 @@ <property> <name>hive.metastore.uris</name> - <value>thrift://3y_ip:9083</value> + <value>thrift://hive_ip:9083</value> <description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore. </description> </property> @@ -40,32 +40,4 @@ <value>true</value> </property> - <property> - <name>hive.server2.logging.operation.log.location</name> - <value>/root/sd/apache-hive-2.3.4-bin/tmp/operation_logs</value> - <description>Top level directory where operation logs are stored if logging functionality is enabled - </description> - </property> - - <property> - <name>hive.exec.scratchdir</name> - <value>/root/sd/apache-hive-2.3.4-bin/tmp/hive</value> - <description>HDFS root scratch dir for Hive jobs which gets created with write all (733) permission. For each - connecting - user, an HDFS scratch dir: ${hive.exec.scratchdir}/<username> is created, with - ${hive.scratch.dir.permission}. - </description> - </property> - - <property> - <name>hive.exec.local.scratchdir</name> - <value>/root/sd/apache-hive-2.3.4-bin/tmp/hive/local</value> - <description>Local scratch space for Hive jobs</description> - </property> - - <property> - <name>hive.downloaded.resources.dir</name> - <value>/root/sd/apache-hive-2.3.4-bin/tmp/hive/resources</value> - <description>Temporary local directory for added resources in the remote file system.</description> - </property> </configuration> \ No newline at end of file diff --git a/austin-data-house/src/main/resources/sql-client-defaults.yaml b/austin-data-house/src/main/resources/sql-client-defaults.yaml deleted file mode 100644 index f24ef2d..0000000 --- a/austin-data-house/src/main/resources/sql-client-defaults.yaml +++ /dev/null @@ -1,5 +0,0 @@ -catalogs: - - name: my_hive - type: hive - default-database: austin_hive - hive-conf-dir: /opt/flink/conf \ No newline at end of file diff --git a/austin-stream/src/main/java/com/java3y/austin/stream/sink/AustinSink.java b/austin-stream/src/main/java/com/java3y/austin/stream/sink/AustinSink.java index 49c5687..126f2b8 100644 --- a/austin-stream/src/main/java/com/java3y/austin/stream/sink/AustinSink.java +++ b/austin-stream/src/main/java/com/java3y/austin/stream/sink/AustinSink.java @@ -44,7 +44,7 @@ public class AustinSink implements SinkFunction<AnchorInfo> { * 1.构建userId维度的链路信息 数据结构list:{key,list} * key:userId,listValue:[{timestamp,state,businessId},{timestamp,state,businessId}] */ - SimpleAnchorInfo simpleAnchorInfo = SimpleAnchorInfo.builder().businessId(info.getBusinessId()).state(info.getState()).timestamp(info.getTimestamp()).build(); + SimpleAnchorInfo simpleAnchorInfo = SimpleAnchorInfo.builder().businessId(info.getBusinessId()).state(info.getState()).timestamp(info.getLogTimestamp()).build(); for (String id : info.getIds()) { redisFutures.add(redisAsyncCommands.lpush(id.getBytes(), JSON.toJSONString(simpleAnchorInfo).getBytes())); redisFutures.add(redisAsyncCommands.expire(id.getBytes(), (DateUtil.endOfDay(new Date()).getTime() - DateUtil.current()) / 1000)); diff --git a/austin-support/src/main/java/com/java3y/austin/support/utils/LogUtils.java b/austin-support/src/main/java/com/java3y/austin/support/utils/LogUtils.java index 6172227..3464c97 100644 --- a/austin-support/src/main/java/com/java3y/austin/support/utils/LogUtils.java +++ b/austin-support/src/main/java/com/java3y/austin/support/utils/LogUtils.java @@ -10,7 +10,6 @@ import com.java3y.austin.support.mq.SendMqService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; -import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; /** @@ -48,7 +47,7 @@ public class LogUtils extends CustomLogListener { * 记录打点信息 */ public void print(AnchorInfo anchorInfo) { - anchorInfo.setTimestamp(System.currentTimeMillis()); + anchorInfo.setLogTimestamp(System.currentTimeMillis()); String message = JSON.toJSONString(anchorInfo); log.info(message); diff --git a/docker/apollo/docker-quick-start/docker-compose.yml b/docker/apollo/docker-quick-start/docker-compose.yml index 4d8de1f..10a0a3f 100644 --- a/docker/apollo/docker-quick-start/docker-compose.yml +++ b/docker/apollo/docker-quick-start/docker-compose.yml @@ -1,11 +1,12 @@ -version: '2' +version: '2.1' services: apollo-quick-start: image: nobodyiam/apollo-quick-start container_name: apollo-quick-start depends_on: - - apollo-db + apollo-db: + condition: service_healthy ports: - "8080:8080" - "8090:8090" @@ -14,11 +15,16 @@ services: - apollo-db apollo-db: - image: mysql:5.7 + image: mysql:8.0 container_name: apollo-db environment: TZ: Asia/Shanghai MYSQL_ALLOW_EMPTY_PASSWORD: 'yes' + healthcheck: + test: ["CMD", "mysqladmin" ,"ping", "-h", "localhost"] + interval: 5s + timeout: 1s + retries: 10 depends_on: - apollo-dbdata ports: diff --git a/docker/flink/docker-compose.yml b/docker/flink/docker-compose.yml index 276fd31..7f669a5 100644 --- a/docker/flink/docker-compose.yml +++ b/docker/flink/docker-compose.yml @@ -1,40 +1,27 @@ version: "2.2" services: jobmanager: - image: flink:1.14.3 + image: flink:latest ports: - "8081:8081" command: jobmanager - volumes: - - ./sql-client-defaults.yaml:/opt/flink/conf/sql-client-defaults.yaml:rw - - ./hive-site.xml:/opt/flink/conf/hive-site.xml:rw environment: - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager + - SET_CONTAINER_TIMEZONE=true + - CONTAINER_TIMEZONE=Asia/Shanghai + - TZ=Asia/Shanghai taskmanager: - image: flink:1.14.3 + image: flink:latest depends_on: - jobmanager command: taskmanager - volumes: - - ./sql-client-defaults.yaml:/opt/flink/conf/sql-client-defaults.yaml:rw - - ./hive-site.xml:/opt/flink/conf/hive-site.xml:rw environment: - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager taskmanager.numberOfTaskSlots: 2 - sql-client: - image: flink:1.14.3 - volumes: - - ./sql-client-defaults.yaml:/opt/flink/conf/sql-client-defaults.yaml:rw - - ./hive-site.xml:/opt/flink/conf/hive-site.xml:rw - command: ./bin/sql-client.sh embedded ../conf/sql-client-defaults.yaml - depends_on: - - jobmanager - environment: - - | - FLINK_PROPERTIES= - jobmanager.rpc.address: jobmanager - rest.address: jobmanager \ No newline at end of file + - SET_CONTAINER_TIMEZONE=true + - CONTAINER_TIMEZONE=Asia/Shanghai + - TZ=Asia/Shanghai \ No newline at end of file diff --git a/docker/flink/hive-site.xml b/docker/flink/hive-site.xml deleted file mode 100644 index ef0ff43..0000000 --- a/docker/flink/hive-site.xml +++ /dev/null @@ -1,71 +0,0 @@ -<?xml version="1.0"?> -<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> - - -<configuration> - <!--指定mysql数据库连接的database--> - <property> - <name>javax.jdo.option.ConnectionURL</name> - <value>jdbc:postgresql://3y_ip:5432/metastore?createDatabaseIfNotExist=true</value> - <description>JDBC connect string for a JDBC metastore</description> - </property> - - <property> - <name>javax.jdo.option.ConnectionDriverName</name> - <value>org.postgresql.Driver</value> - <description>Driver class name for a JDBC metastore</description> - </property> - - <property> - <name>javax.jdo.option.ConnectionUserName</name> - <value>hive</value> - <description>username to use against metastore database</description> - </property> - - <property> - <name>javax.jdo.option.ConnectionPassword</name> - <value>hive</value> - <description>password to use against metastore database</description> - </property> - - <property> - <name>hive.metastore.uris</name> - <value>thrift://3y_ip:9083</value> - <description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore. - </description> - </property> - - <property> - <name>datanucleus.schema.autoCreateAll</name> - <value>true</value> - </property> - - <property> - <name>hive.server2.logging.operation.log.location</name> - <value>/root/sd/apache-hive-2.3.4-bin/tmp/operation_logs</value> - <description>Top level directory where operation logs are stored if logging functionality is enabled - </description> - </property> - - <property> - <name>hive.exec.scratchdir</name> - <value>/root/sd/apache-hive-2.3.4-bin/tmp/hive</value> - <description>HDFS root scratch dir for Hive jobs which gets created with write all (733) permission. For each - connecting - user, an HDFS scratch dir: ${hive.exec.scratchdir}/<username> is created, with - ${hive.scratch.dir.permission}. - </description> - </property> - - <property> - <name>hive.exec.local.scratchdir</name> - <value>/root/sd/apache-hive-2.3.4-bin/tmp/hive/local</value> - <description>Local scratch space for Hive jobs</description> - </property> - - <property> - <name>hive.downloaded.resources.dir</name> - <value>/root/sd/apache-hive-2.3.4-bin/tmp/hive/resources</value> - <description>Temporary local directory for added resources in the remote file system.</description> - </property> -</configuration> \ No newline at end of file diff --git a/docker/flink/sql-client-defaults.yaml b/docker/flink/sql-client-defaults.yaml deleted file mode 100644 index a7e9f9e..0000000 --- a/docker/flink/sql-client-defaults.yaml +++ /dev/null @@ -1,5 +0,0 @@ -catalogs: - - name: default_catalog - type: hive - default-database: austin_hive - hive-conf-dir: /opt/flink/conf \ No newline at end of file diff --git a/docker/hive/docker-compose.yml b/docker/hive/docker-compose.yml new file mode 100644 index 0000000..26f99a9 --- /dev/null +++ b/docker/hive/docker-compose.yml @@ -0,0 +1,55 @@ +version: "3" + +services: + namenode: + image: bde2020/hadoop-namenode:2.0.0-hadoop2.7.4-java8 + volumes: + - namenode:/hadoop/dfs/name + environment: + - CLUSTER_NAME=test + env_file: + - ./hadoop-hive.env + ports: + - "50070:50070" + - "9000:9000" + - "8020:8020" + datanode: + image: bde2020/hadoop-datanode:2.0.0-hadoop2.7.4-java8 + volumes: + - datanode:/hadoop/dfs/data + env_file: + - ./hadoop-hive.env + environment: + SERVICE_PRECONDITION: "namenode:50070" + ports: + - "50075:50075" + - "50010:50010" + - "50020:50020" + hive-server: + image: bde2020/hive:2.3.2-postgresql-metastore + env_file: + - ./hadoop-hive.env + environment: + HIVE_CORE_CONF_javax_jdo_option_ConnectionURL: "jdbc:postgresql://hive-metastore/metastore" + SERVICE_PRECONDITION: "hive-metastore:9083" + ports: + - "10000:10000" + hive-metastore: + image: bde2020/hive:2.3.2-postgresql-metastore + env_file: + - ./hadoop-hive.env + command: /opt/hive/bin/hive --service metastore + environment: + SERVICE_PRECONDITION: "namenode:50070 datanode:50075 hive-metastore-postgresql:5432" + ports: + - "9083:9083" + hive-metastore-postgresql: + image: bde2020/hive-metastore-postgresql:2.3.0 + presto-coordinator: + image: shawnzhu/prestodb:0.181 + ports: + - "8080:8080" + +volumes: + namenode: + datanode: \ No newline at end of file diff --git a/sql/austin.sql b/sql/austin.sql index 0f7782e..8fd7d3e 100644 --- a/sql/austin.sql +++ b/sql/austin.sql @@ -82,13 +82,14 @@ CREATE TABLE `channel_account` DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_unicode_ci COMMENT ='渠道账号信息'; -------------------------------------- 初始化xxl-job的sql---------------------------------- CREATE database if NOT EXISTS `xxl_job` default character set utf8mb4 collate utf8mb4_unicode_ci; use `xxl_job`; SET NAMES utf8mb4; +drop table IF EXISTS xxl_job_info; + CREATE TABLE `xxl_job_info` ( `id` int(11) NOT NULL AUTO_INCREMENT, `job_group` int(11) NOT NULL COMMENT '执行器主键ID', @@ -117,6 +118,7 @@ CREATE TABLE `xxl_job_info` ( PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; +drop table IF EXISTS xxl_job_log; CREATE TABLE `xxl_job_log` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `job_group` int(11) NOT NULL COMMENT '执行器主键ID', @@ -138,6 +140,7 @@ CREATE TABLE `xxl_job_log` ( KEY `I_handle_code` (`handle_code`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; +drop table IF EXISTS xxl_job_log_report; CREATE TABLE `xxl_job_log_report` ( `id` int(11) NOT NULL AUTO_INCREMENT, `trigger_day` datetime DEFAULT NULL COMMENT '调度-时间', @@ -149,6 +152,7 @@ CREATE TABLE `xxl_job_log_report` ( UNIQUE KEY `i_trigger_day` (`trigger_day`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; +drop table IF EXISTS xxl_job_logglue; CREATE TABLE `xxl_job_logglue` ( `id` int(11) NOT NULL AUTO_INCREMENT, `job_id` int(11) NOT NULL COMMENT '任务,主键ID', @@ -160,6 +164,7 @@ CREATE TABLE `xxl_job_logglue` ( PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; +drop table IF EXISTS xxl_job_registry; CREATE TABLE `xxl_job_registry` ( `id` int(11) NOT NULL AUTO_INCREMENT, `registry_group` varchar(50) NOT NULL, @@ -170,6 +175,7 @@ CREATE TABLE `xxl_job_registry` ( KEY `i_g_k_v` (`registry_group`,`registry_key`,`registry_value`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; +drop table IF EXISTS xxl_job_group; CREATE TABLE `xxl_job_group` ( `id` int(11) NOT NULL AUTO_INCREMENT, `app_name` varchar(64) NOT NULL COMMENT '执行器AppName', @@ -180,6 +186,7 @@ CREATE TABLE `xxl_job_group` ( PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; +drop table IF EXISTS xxl_job_user; CREATE TABLE `xxl_job_user` ( `id` int(11) NOT NULL AUTO_INCREMENT, `username` varchar(50) NOT NULL COMMENT '账号', @@ -190,6 +197,7 @@ CREATE TABLE `xxl_job_user` ( UNIQUE KEY `i_username` (`username`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; +drop table IF EXISTS xxl_job_lock; CREATE TABLE `xxl_job_lock` ( `lock_name` varchar(50) NOT NULL COMMENT '锁名称', PRIMARY KEY (`lock_name`) @@ -201,5 +209,3 @@ INSERT INTO `xxl_job_user`(`id`, `username`, `password`, `role`, `permission`) V INSERT INTO `xxl_job_lock` ( `lock_name`) VALUES ( 'schedule_lock'); commit; - -------------------------------------- 初始化xxl-job的sql---------------------------------- \ No newline at end of file diff --git a/stream-start.sh b/stream-start.sh index 8c06308..1765ee8 100644 --- a/stream-start.sh +++ b/stream-start.sh @@ -1,5 +1,9 @@ #!/bin/bash docker cp ./austin-stream/target/austin-stream-0.0.1-SNAPSHOT.jar austin_jobmanager_1:/opt/ - docker exec -ti austin_jobmanager_1 flink run /opt/austin-stream-0.0.1-SNAPSHOT.jar + +# local test +# docker cp ./austin-stream-0.0.1-SNAPSHOT.jar flink_docker_jobmanager_1:/opt/ +# docker exec -ti flink_docker_jobmanager_1 flink run /opt/austin-stream-0.0.1-SNAPSHOT.jar +