mirror of https://github.com/ZhongFuCheng3y/austin
parent
cbf28fb1a7
commit
cc580a9b65
@ -1,82 +0,0 @@
|
||||
package com.java3y.austin.datahouse;
|
||||
|
||||
import org.apache.flink.streaming.api.CheckpointingMode;
|
||||
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
|
||||
import org.apache.flink.table.api.EnvironmentSettings;
|
||||
import org.apache.flink.table.api.SqlDialect;
|
||||
import org.apache.flink.table.api.TableEnvironment;
|
||||
import org.apache.flink.table.catalog.hive.HiveCatalog;
|
||||
|
||||
import java.time.Duration;
|
||||
|
||||
/**
|
||||
* flink启动类
|
||||
*
|
||||
* @author 3y
|
||||
*/
|
||||
public class AustinBootStrap {
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
|
||||
tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE);
|
||||
tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(20));
|
||||
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
|
||||
|
||||
String catalogName = "my_hive";
|
||||
|
||||
HiveCatalog catalog = new HiveCatalog(catalogName, "austin_hive", null);
|
||||
|
||||
tableEnv.registerCatalog(catalogName, catalog);
|
||||
tableEnv.useCatalog(catalogName);
|
||||
|
||||
tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS austin");
|
||||
tableEnv.executeSql("DROP TABLE IF EXISTS austin.message_anchor_info");
|
||||
|
||||
tableEnv.executeSql("create table austin.message_anchor_info(" +
|
||||
"ids String,\n" +
|
||||
"state String,\n" +
|
||||
"businessId String,\n" +
|
||||
"log_ts Timestamp(3),\n" +
|
||||
"WATERMARK FOR log_ts AS log_ts -INTERVAL '5' SECOND" +
|
||||
")WITH(" +
|
||||
" 'connector' = 'kafka',\n" +
|
||||
"'topic' = 'austinTraceLog',\n" +
|
||||
" 'properties.bootstrap.servers' = 'kafka_ip:9092',\n" +
|
||||
"'properties.group.id' = 'flink1',\n" +
|
||||
"'scan.startup.mode' = 'earliest-offset',\n" +
|
||||
"'format' = 'json',\n" +
|
||||
"'json.fail-on-missing-field' = 'false',\n" +
|
||||
"'json.ignore-parse-errors' = 'true'" +
|
||||
")");
|
||||
|
||||
|
||||
// tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS hive_tmp");
|
||||
// tableEnv.executeSql("DROP TABLE IF EXISTS hive_tmp.log_hive");
|
||||
//
|
||||
// tableEnv.executeSql(" CREATE TABLE hive_tmp.log_hive (\n" +
|
||||
// " user_id STRING,\n" +
|
||||
// " order_amount DOUBLE\n" +
|
||||
// " ) PARTITIONED BY (\n" +
|
||||
// " dt STRING,\n" +
|
||||
// " hr STRING\n" +
|
||||
// " ) STORED AS PARQUET\n" +
|
||||
// " TBLPROPERTIES (\n" +
|
||||
// " 'sink.partition-commit.trigger' = 'partition-time',\n" +
|
||||
// " 'sink.partition-commit.delay' = '1 min',\n" +
|
||||
// " 'format' = 'json',\n" +
|
||||
// " 'sink.partition-commit.policy.kind' = 'metastore,success-file',\n" +
|
||||
// " 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'" +
|
||||
// " )");
|
||||
// tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
|
||||
//
|
||||
// tableEnv.executeSql("" +
|
||||
// " INSERT INTO hive_tmp.log_hive\n" +
|
||||
// " SELECT\n" +
|
||||
// " user_id,\n" +
|
||||
// " order_amount,\n" +
|
||||
// " DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH')\n" +
|
||||
// " FROM austin.message_anchor_info");
|
||||
|
||||
}
|
||||
}
|
@ -0,0 +1,80 @@
|
||||
package com.java3y.austin.datahouse;
|
||||
|
||||
|
||||
import com.java3y.austin.datahouse.constants.DataHouseConstant;
|
||||
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||
import org.apache.flink.table.api.SqlDialect;
|
||||
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
|
||||
import org.apache.flink.table.catalog.hive.HiveCatalog;
|
||||
|
||||
/**
|
||||
* flink启动类
|
||||
* <p>
|
||||
* 接受Kafka的消息 写入hive表中
|
||||
*
|
||||
* @author 3y
|
||||
*/
|
||||
public class AustinHiveBootStrap {
|
||||
|
||||
public static void main(String[] args) {
|
||||
|
||||
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1).enableCheckpointing(3000);
|
||||
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
|
||||
|
||||
// 1.初始化catalog
|
||||
String catalogName = DataHouseConstant.CATALOG_NAME;
|
||||
HiveCatalog catalog = new HiveCatalog(catalogName, DataHouseConstant.CATALOG_DEFAULT_DATABASE, null);
|
||||
tableEnv.registerCatalog(catalogName, catalog);
|
||||
tableEnv.useCatalog(catalogName);
|
||||
|
||||
// 2.创建Kafka源表
|
||||
String kafkaSourceTableCreate = "DROP TABLE IF EXISTS " + DataHouseConstant.CATALOG_DEFAULT_DATABASE + "." + DataHouseConstant.KAFKA_SOURCE_TABLE_NAME;
|
||||
tableEnv.executeSql(kafkaSourceTableCreate);
|
||||
String kafkaSourceTableDDL = "CREATE TABLE <CATALOG_DEFAULT_DATABASE>.<KAFKA_SOURCE_TABLE_NAME> (\n" +
|
||||
"`ids` String,\n" +
|
||||
"`state` String,\n" +
|
||||
"`businessId` String,\n" +
|
||||
"`logTimestamp` String\n" +
|
||||
") WITH (\n" +
|
||||
" 'connector' = 'kafka'," +
|
||||
" 'topic' = '<KAFKA_TOPIC>',\n" +
|
||||
" 'properties.bootstrap.servers' = '<KAFKA_IP_PORT>',\n" +
|
||||
" 'properties.group.id' = 'group_test_01',\n" +
|
||||
" 'format' = 'json',\n" +
|
||||
" 'json.fail-on-missing-field' = 'true',\n" +
|
||||
" 'json.ignore-parse-errors' = 'false',\n" +
|
||||
" 'scan.topic-partition-discovery.interval'='1s',\n" +
|
||||
" 'scan.startup.mode' = 'latest-offset')";
|
||||
kafkaSourceTableDDL = kafkaSourceTableDDL.replace("<CATALOG_DEFAULT_DATABASE>", DataHouseConstant.CATALOG_DEFAULT_DATABASE)
|
||||
.replace("<KAFKA_SOURCE_TABLE_NAME>", DataHouseConstant.KAFKA_SOURCE_TABLE_NAME)
|
||||
.replace("<KAFKA_IP_PORT>", DataHouseConstant.KAFKA_IP_PORT)
|
||||
.replace("<KAFKA_TOPIC>", DataHouseConstant.KAFKA_TOPIC);
|
||||
|
||||
tableEnv.executeSql(kafkaSourceTableDDL);
|
||||
|
||||
// 创建写入hive的表
|
||||
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
|
||||
tableEnv.executeSql("DROP TABLE IF EXISTS " + DataHouseConstant.CATALOG_DEFAULT_DATABASE + "." + DataHouseConstant.KAFKA_SINK_TABLE_NAME);
|
||||
String kafkaSinkTableDDL = "CREATE TABLE IF NOT EXISTS <CATALOG_DEFAULT_DATABASE>.<KAFKA_SINK_TABLE_NAME> (\n" +
|
||||
"`ids` String,\n" +
|
||||
"`state` String,\n" +
|
||||
"`businessId` String,\n" +
|
||||
"`logTimestamp` String\n" +
|
||||
") STORED AS PARQUET\n" +
|
||||
"TBLPROPERTIES (\n" +
|
||||
" 'sink.partition-commit.policy.kind' = 'metastore,success-file',\n" +
|
||||
" 'sink.partition-commit.trigger' = 'partition-time',\n" +
|
||||
" 'sink.partition-commit.delay' = '1 min',\n" +
|
||||
" 'sink.buffer-flush.max-rows'='10',\n" +
|
||||
" 'sink.buffer-flush.interval' = '5s'\n" +
|
||||
")";
|
||||
kafkaSinkTableDDL = kafkaSinkTableDDL.replace("<CATALOG_DEFAULT_DATABASE>", DataHouseConstant.CATALOG_DEFAULT_DATABASE)
|
||||
.replace("<KAFKA_SINK_TABLE_NAME>", DataHouseConstant.KAFKA_SINK_TABLE_NAME);
|
||||
tableEnv.executeSql(kafkaSinkTableDDL);
|
||||
|
||||
// 3. 将kafka_source 数据写入到kafka_sink 完成
|
||||
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
|
||||
tableEnv.executeSql("INSERT INTO " + DataHouseConstant.CATALOG_DEFAULT_DATABASE + "." + DataHouseConstant.KAFKA_SINK_TABLE_NAME + " SELECT ids,state,businessId,logTimestamp FROM " + DataHouseConstant.CATALOG_DEFAULT_DATABASE + "." + DataHouseConstant.KAFKA_SOURCE_TABLE_NAME + "");
|
||||
|
||||
}
|
||||
}
|
@ -1,5 +0,0 @@
|
||||
catalogs:
|
||||
- name: my_hive
|
||||
type: hive
|
||||
default-database: austin_hive
|
||||
hive-conf-dir: /opt/flink/conf
|
@ -1,40 +1,27 @@
|
||||
version: "2.2"
|
||||
services:
|
||||
jobmanager:
|
||||
image: flink:1.14.3
|
||||
image: flink:latest
|
||||
ports:
|
||||
- "8081:8081"
|
||||
command: jobmanager
|
||||
volumes:
|
||||
- ./sql-client-defaults.yaml:/opt/flink/conf/sql-client-defaults.yaml:rw
|
||||
- ./hive-site.xml:/opt/flink/conf/hive-site.xml:rw
|
||||
environment:
|
||||
- |
|
||||
FLINK_PROPERTIES=
|
||||
jobmanager.rpc.address: jobmanager
|
||||
- SET_CONTAINER_TIMEZONE=true
|
||||
- CONTAINER_TIMEZONE=Asia/Shanghai
|
||||
- TZ=Asia/Shanghai
|
||||
taskmanager:
|
||||
image: flink:1.14.3
|
||||
image: flink:latest
|
||||
depends_on:
|
||||
- jobmanager
|
||||
command: taskmanager
|
||||
volumes:
|
||||
- ./sql-client-defaults.yaml:/opt/flink/conf/sql-client-defaults.yaml:rw
|
||||
- ./hive-site.xml:/opt/flink/conf/hive-site.xml:rw
|
||||
environment:
|
||||
- |
|
||||
FLINK_PROPERTIES=
|
||||
jobmanager.rpc.address: jobmanager
|
||||
taskmanager.numberOfTaskSlots: 2
|
||||
sql-client:
|
||||
image: flink:1.14.3
|
||||
volumes:
|
||||
- ./sql-client-defaults.yaml:/opt/flink/conf/sql-client-defaults.yaml:rw
|
||||
- ./hive-site.xml:/opt/flink/conf/hive-site.xml:rw
|
||||
command: ./bin/sql-client.sh embedded ../conf/sql-client-defaults.yaml
|
||||
depends_on:
|
||||
- jobmanager
|
||||
environment:
|
||||
- |
|
||||
FLINK_PROPERTIES=
|
||||
jobmanager.rpc.address: jobmanager
|
||||
rest.address: jobmanager
|
||||
- SET_CONTAINER_TIMEZONE=true
|
||||
- CONTAINER_TIMEZONE=Asia/Shanghai
|
||||
- TZ=Asia/Shanghai
|
@ -1,71 +0,0 @@
|
||||
<?xml version="1.0"?>
|
||||
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
|
||||
|
||||
|
||||
<configuration>
|
||||
<!--指定mysql数据库连接的database-->
|
||||
<property>
|
||||
<name>javax.jdo.option.ConnectionURL</name>
|
||||
<value>jdbc:postgresql://3y_ip:5432/metastore?createDatabaseIfNotExist=true</value>
|
||||
<description>JDBC connect string for a JDBC metastore</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>javax.jdo.option.ConnectionDriverName</name>
|
||||
<value>org.postgresql.Driver</value>
|
||||
<description>Driver class name for a JDBC metastore</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>javax.jdo.option.ConnectionUserName</name>
|
||||
<value>hive</value>
|
||||
<description>username to use against metastore database</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>javax.jdo.option.ConnectionPassword</name>
|
||||
<value>hive</value>
|
||||
<description>password to use against metastore database</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>hive.metastore.uris</name>
|
||||
<value>thrift://3y_ip:9083</value>
|
||||
<description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>datanucleus.schema.autoCreateAll</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>hive.server2.logging.operation.log.location</name>
|
||||
<value>/root/sd/apache-hive-2.3.4-bin/tmp/operation_logs</value>
|
||||
<description>Top level directory where operation logs are stored if logging functionality is enabled
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>hive.exec.scratchdir</name>
|
||||
<value>/root/sd/apache-hive-2.3.4-bin/tmp/hive</value>
|
||||
<description>HDFS root scratch dir for Hive jobs which gets created with write all (733) permission. For each
|
||||
connecting
|
||||
user, an HDFS scratch dir: ${hive.exec.scratchdir}/<username> is created, with
|
||||
${hive.scratch.dir.permission}.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>hive.exec.local.scratchdir</name>
|
||||
<value>/root/sd/apache-hive-2.3.4-bin/tmp/hive/local</value>
|
||||
<description>Local scratch space for Hive jobs</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>hive.downloaded.resources.dir</name>
|
||||
<value>/root/sd/apache-hive-2.3.4-bin/tmp/hive/resources</value>
|
||||
<description>Temporary local directory for added resources in the remote file system.</description>
|
||||
</property>
|
||||
</configuration>
|
@ -1,5 +0,0 @@
|
||||
catalogs:
|
||||
- name: default_catalog
|
||||
type: hive
|
||||
default-database: austin_hive
|
||||
hive-conf-dir: /opt/flink/conf
|
@ -0,0 +1,55 @@
|
||||
version: "3"
|
||||
|
||||
services:
|
||||
namenode:
|
||||
image: bde2020/hadoop-namenode:2.0.0-hadoop2.7.4-java8
|
||||
volumes:
|
||||
- namenode:/hadoop/dfs/name
|
||||
environment:
|
||||
- CLUSTER_NAME=test
|
||||
env_file:
|
||||
- ./hadoop-hive.env
|
||||
ports:
|
||||
- "50070:50070"
|
||||
- "9000:9000"
|
||||
- "8020:8020"
|
||||
datanode:
|
||||
image: bde2020/hadoop-datanode:2.0.0-hadoop2.7.4-java8
|
||||
volumes:
|
||||
- datanode:/hadoop/dfs/data
|
||||
env_file:
|
||||
- ./hadoop-hive.env
|
||||
environment:
|
||||
SERVICE_PRECONDITION: "namenode:50070"
|
||||
ports:
|
||||
- "50075:50075"
|
||||
- "50010:50010"
|
||||
- "50020:50020"
|
||||
hive-server:
|
||||
image: bde2020/hive:2.3.2-postgresql-metastore
|
||||
env_file:
|
||||
- ./hadoop-hive.env
|
||||
environment:
|
||||
HIVE_CORE_CONF_javax_jdo_option_ConnectionURL: "jdbc:postgresql://hive-metastore/metastore"
|
||||
SERVICE_PRECONDITION: "hive-metastore:9083"
|
||||
ports:
|
||||
- "10000:10000"
|
||||
hive-metastore:
|
||||
image: bde2020/hive:2.3.2-postgresql-metastore
|
||||
env_file:
|
||||
- ./hadoop-hive.env
|
||||
command: /opt/hive/bin/hive --service metastore
|
||||
environment:
|
||||
SERVICE_PRECONDITION: "namenode:50070 datanode:50075 hive-metastore-postgresql:5432"
|
||||
ports:
|
||||
- "9083:9083"
|
||||
hive-metastore-postgresql:
|
||||
image: bde2020/hive-metastore-postgresql:2.3.0
|
||||
presto-coordinator:
|
||||
image: shawnzhu/prestodb:0.181
|
||||
ports:
|
||||
- "8080:8080"
|
||||
|
||||
volumes:
|
||||
namenode:
|
||||
datanode:
|
@ -1,5 +1,9 @@
|
||||
#!/bin/bash
|
||||
|
||||
docker cp ./austin-stream/target/austin-stream-0.0.1-SNAPSHOT.jar austin_jobmanager_1:/opt/
|
||||
|
||||
docker exec -ti austin_jobmanager_1 flink run /opt/austin-stream-0.0.1-SNAPSHOT.jar
|
||||
|
||||
# local test
|
||||
# docker cp ./austin-stream-0.0.1-SNAPSHOT.jar flink_docker_jobmanager_1:/opt/
|
||||
# docker exec -ti flink_docker_jobmanager_1 flink run /opt/austin-stream-0.0.1-SNAPSHOT.jar
|
||||
|
||||
|
Loading…
Reference in new issue