|
|
@ -1,530 +1,3 @@
|
|
|
|
Apache Flink是一个**框架**和**分布式处理引擎**,用于在**无界**和**有界**数据流上进行**有状态的计算**。Flink被设计为在所有常见的集群环境中运行,以内存中的速度和任何规模执行计算。
|
|
|
|
Spark Streaming,即核心Spark API的扩展,不像Storm那样一次处理一个数据流。相反,它在处理数据流之前,会按照时间间隔对数据流进行分段切分。Spark针对连续数据流的抽象,我们称为DStream(Discretized Stream)。 DStream是小批处理的RDD(弹性分布式数据集), RDD则是分布式数据集,可以通过任意函数和滑动数据窗口(窗口计算)进行转换,实现并行操作。
|
|
|
|
|
|
|
|
|
|
|
|

|
|
|
|

|
|
|
|
|
|
|
|
|
|
|
|
针对流数据+批数据的计算框架。把批数据看作流数据的一种特例,延迟性较低(毫秒级),且能保证消息传输不丢失不重复。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Flink创造性地统一了流处理和批处理,作为流处理看待时输入数据流是无界的,而批处理被作为一种特殊的流处理,只是它的输入数据流被定义为有界的。Flink程序由Stream和Transformation这两个基本构建块组成,其中Stream是一个中间结果数据,而Transformation是一个操作,它对一个或多个输入Stream进行计算处理,输出一个或多个结果Stream。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
### 处理无界和有界数据
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
数据可以作为无界流或有界流被处理:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
- **Unbounded streams**(无界流)有一个起点,但没有定义的终点。它们不会终止,而且会源源不断的提供数据。无边界的流必须被连续地处理,即事件达到后必须被立即处理。等待所有输入数据到达是不可能的,因为输入是无界的,并且在任何时间点都不会完成。处理无边界的数据通常要求以特定顺序(例如,事件发生的顺序)接收事件,以便能够推断出结果的完整性。
|
|
|
|
|
|
|
|
- **Bounded streams**(有界流)有一个定义的开始和结束。在执行任何计算之前,可以通过摄取(提取)所有数据来处理有界流。处理有界流不需要有序摄取,因为有界数据集总是可以排序的。有界流的处理也称为批处理。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|

|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
**Apache Flink擅长处理无界和有界数据集**。对时间和状态的精确控制使Flink的运行时能够在无边界的流上运行任何类型的应用程序。有界流由专门为固定大小的数据集设计的算法和数据结构在内部处理,从而产生出色的性能。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
### 分层API
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Flink提供了三层API。每个API在简洁性和表达性之间提供了不同的权衡,并且针对不同的使用场景
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|

|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
### 应用场景
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Apache Flink是开发和运行许多不同类型应用程序的最佳选择,因为它具有丰富的特性。Flink的特性包括支持流和批处理、复杂的状态管理、事件处理语义以及确保状态的一致性。此外,Flink可以部署在各种资源提供程序上,例如YARN、Apache Mesos和Kubernetes,也可以作为裸机硬件上的独立集群进行部署。配置为高可用性,Flink没有单点故障。Flink已经被证明可以扩展到数千个内核和TB级的应用程序状态,提供高吞吐量和低延迟,并支持世界上一些最苛刻的流处理应用程序。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
下面是Flink支持的最常见的应用程序类型:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
- Event-driven Applications(事件驱动的应用程序)
|
|
|
|
|
|
|
|
- Data Analytics Applications(数据分析应用程序)
|
|
|
|
|
|
|
|
- Data Pipeline Applications(数据管道应用程序)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#### Event-driven Applications
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Event-driven Applications(事件驱动的应用程序)。事件驱动的应用程序是一个有状态的应用程序,它从一个或多个事件流中获取事件,并通过触发计算、状态更新或外部操作对传入的事件作出反应。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
事件驱动的应用程序基于有状态的流处理应用程序。在这种设计中,数据和计算被放在一起,从而可以进行本地(内存或磁盘)数据访问。通过定期将检查点写入远程持久存储,可以实现容错。下图描述了传统应用程序体系结构和事件驱动应用程序之间的区别。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|

|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
代替查询远程数据库,事件驱动的应用程序在本地访问其数据,从而在吞吐量和延迟方面获得更好的性能。可以定期异步地将检查点同步到远程持久存,而且支持增量同步。不仅如此,在分层架构中,多个应用程序共享同一个数据库是很常见的。因此,数据库的任何更改都需要协调,由于每个事件驱动的应用程序都负责自己的数据,因此更改数据表示或扩展应用程序所需的协调较少。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
对于事件驱动的应用程序,Flink的突出特性是savepoint。保存点是一个一致的状态镜像,可以用作兼容应用程序的起点。给定一个保存点,就可以更新或调整应用程序的规模,或者可以启动应用程序的多个版本进行A/B测试。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
典型的事件驱动的应用程序有:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
- 欺诈检测
|
|
|
|
|
|
|
|
- 异常检测
|
|
|
|
|
|
|
|
- 基于规则的提醒
|
|
|
|
|
|
|
|
- 业务流程监控
|
|
|
|
|
|
|
|
- Web应用(社交网络)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#### Data Analytics Applications
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Data Analytics Applications(数据分析应用程序)。传统上的分析是作为批处理查询或应用程序对已记录事件的有限数据集执行的。为了将最新数据合并到分析结果中,必须将其添加到分析数据集中,然后重新运行查询或应用程序,结果被写入存储系统或作为报告发出。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
有了复杂的流处理引擎,分析也可以以实时方式执行。流查询或应用程序不是读取有限的数据集,而是接收实时事件流,并在使用事件时不断地生成和更新结果。结果要么写入外部数据库,要么作为内部状态进行维护。Dashboard应用程序可以从外部数据库读取最新的结果,也可以直接查询应用程序的内部状态。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Apache Flink支持流以及批处理分析应用程序,如下图所示:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|

|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
典型的数据分析应用程序有:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
- 电信网络质量监控
|
|
|
|
|
|
|
|
- 产品更新分析及移动应用实验评估
|
|
|
|
|
|
|
|
- 消费者技术中实时数据的特别分析
|
|
|
|
|
|
|
|
- 大规模图分析
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#### Data Pipeline Applications
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Data Pipeline Applications(数据管道应用程序)。提取-转换-加载(ETL)是在存储系统之间转换和移动数据的常用方法。通常,会定期触发ETL作业,以便将数据从事务性数据库系统复制到分析数据库或数据仓库。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
数据管道的作用类似于ETL作业。它们转换和丰富数据,并可以将数据从一个存储系统移动到另一个存储系统。但是,它们以连续流模式运行,而不是周期性地触发。因此,它们能够从不断产生数据的源读取记录,并以低延迟将其移动到目的地。例如,数据管道可以监视文件系统目录中的新文件,并将它们的数据写入事件日志。另一个应用程序可能将事件流物化到数据库,或者增量地构建和完善搜索索引。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
下图描述了周期性ETL作业和连续数据管道之间的差异:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|

|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
与周期性ETL作业相比,连续数据管道的明显优势是减少了将数据移至其目的地的等待时间。此外,数据管道更通用,可用于更多场景,因为它们能够连续消费和产生数据。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
典型的数据管道应用程序有:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
- 电商中实时搜索索引的建立
|
|
|
|
|
|
|
|
- 电商中的持续ETL
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
### 安装Flink
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
https://flink.apache.org/downloads.html
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
下载安装包,这里下载的是 flink-1.10.1-bin-scala_2.11.tgz
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
安装参考 https://ci.apache.org/projects/flink/flink-docs-release-1.10/getting-started/tutorials/local_setup.html
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
```shell
|
|
|
|
|
|
|
|
./bin/start-cluster.sh # Start Flink
|
|
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|

|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
访问 http://localhost:8081
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|

|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
运行 WordCount 示例
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|

|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|

|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|

|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
### 商品实时推荐
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
基于Flink实现的商品实时推荐系统。flink统计商品热度,放入redis缓存,分析日志信息,将画像标签和实时记录放入Hbase。在用户发起推荐请求后,根据用户画像重排序热度榜,并结合协同过滤和标签两个推荐模块为新生成的榜单的每一个产品添加关联产品,最后返回新的用户列表。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#### 系统架构
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|

|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
在日志数据模块(flink-2-hbase)中,又主要分为6个Flink任务:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
- **用户-产品浏览历史 -> 实现基于协同过滤的推荐逻辑**
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
通过Flink去记录用户浏览过这个类目下的哪些产品,为后面的基于Item的协同过滤做准备 实时的记录用户的评分到Hbase中,为后续离线处理做准备。数据存储在Hbase的p_history表
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
- **用户-兴趣 -> 实现基于上下文的推荐逻辑**
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
根据用户对同一个产品的操作计算兴趣度,计算规则通过操作间隔时间(如购物 - 浏览 < 100s)则判定为一次兴趣事件 通过Flink的ValueState实现,如果用户的操作Action=3(收藏),则清除这个产品的state,如果超过100s没有出现Action=3的事件,也会清除这个state。数据存储在Hbase的u_interest表
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
- **用户画像计算 -> 实现基于标签的推荐逻辑**
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
v1.0按照三个维度去计算用户画像,分别是用户的颜色兴趣,用户的产地兴趣,和用户的风格兴趣.根据日志不断的修改用户画像的数据,记录在Hbase中。数据存储在Hbase的user表
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
- **产品画像记录 -> 实现基于标签的推荐逻辑**
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
用两个维度记录产品画像,一个是喜爱该产品的年龄段,另一个是性别。数据存储在Hbase的prod表
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
- **事实热度榜 -> 实现基于热度的推荐逻辑**
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
通过Flink时间窗口机制,统计当前时间的实时热度,并将数据缓存在Redis中。通过Flink的窗口机制计算实时热度,使用ListState保存一次热度榜。数据存储在redis中,按照时间戳存储list
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
- **日志导入**
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
从Kafka接收的数据直接导入进Hbase事实表,保存完整的日志log,日志中包含了用户Id,用户操作的产品id,操作时间,行为(如购买,点击,推荐等)。数据按时间窗口统计数据大屏需要的数据,返回前段展示。数据存储在Hbase的con表
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#### 推荐引擎逻辑
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
**基于热度的推荐逻辑**
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|

|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
​根据用户特征,重新排序热度榜,之后根据两种推荐算法计算得到的产品相关度评分,为每个热度榜中的产品推荐几个关联的产品。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
**基于产品画像的产品相似度计算方法**
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
基于产品画像的推荐逻辑依赖于产品画像和热度榜两个维度,产品画像有三个特征,包含color/country/style三个角度,通过计算用户对该类目产品的评分来过滤热度榜上的产品。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|

|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
在已经有产品画像的基础上,计算item与item之间的关联系,通过余弦相似度来计算两两之间的评分,最后在已有物品选中的情况下推荐关联性更高的产品。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 相似度 | A | B | C |
|
|
|
|
|
|
|
|
| ------ | ---- | ---- | ---- |
|
|
|
|
|
|
|
|
| A | 1 | 0.7 | 0.2 |
|
|
|
|
|
|
|
|
| B | 0.7 | 1 | 0.6 |
|
|
|
|
|
|
|
|
| C | 0.2 | 0.6 | 1 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
**基于协同过滤的产品相似度计算方法**
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
根据产品用户表(Hbase) 去计算公式得到相似度评分:
|
|
|
|
|
|
|
|

|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
**前台推荐页面**
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
当前推荐结果分为3列,分别是热度榜推荐,协同过滤推荐和产品画像推荐:
|
|
|
|
|
|
|
|

|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
### 实时计算TopN热榜
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
本案例将实现一个“实时热门商品”的需求,我们可以将“实时热门商品”翻译成程序员更好理解的需求:每隔5分钟输出最近一小时内点击量最多的前 N 个商品。将这个需求进行分解我们大概要做这么几件事情:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
- 抽取出业务时间戳,告诉 Flink 框架基于业务时间做窗口
|
|
|
|
|
|
|
|
- 过滤出点击行为数据
|
|
|
|
|
|
|
|
- 按一小时的窗口大小,每5分钟统计一次,做滑动窗口聚合(Sliding Window)
|
|
|
|
|
|
|
|
- 按每个窗口聚合,输出每个窗口中点击量前N名的商品
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#### 数据准备
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
这里我们准备了一份淘宝用户行为数据集(来自[阿里云天池公开数据集](https://tianchi.aliyun.com/datalab/index.htm))。本数据集包含了淘宝上某一天随机一百万用户的所有行为(包括点击、购买、加购、收藏)。数据集的组织形式和MovieLens-20M类似,即数据集的每一行表示一条用户行为,由用户ID、商品ID、商品类目ID、行为类型和时间戳组成,并以逗号分隔。关于数据集中每一列的详细描述如下:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 列名称 | 说明 |
|
|
|
|
|
|
|
|
| :--------- | :------------------------------------------------- |
|
|
|
|
|
|
|
|
| 用户ID | 整数类型,加密后的用户ID |
|
|
|
|
|
|
|
|
| 商品ID | 整数类型,加密后的商品ID |
|
|
|
|
|
|
|
|
| 商品类目ID | 整数类型,加密后的商品所属类目ID |
|
|
|
|
|
|
|
|
| 行为类型 | 字符串,枚举类型,包括(‘pv’, ‘buy’, ‘cart’, ‘fav’) |
|
|
|
|
|
|
|
|
| 时间戳 | 行为发生的时间戳,单位秒 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
你可以通过下面的命令下载数据集到项目的 `resources` 目录下:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
```shell
|
|
|
|
|
|
|
|
$ cd my-flink-project/src/main/resources
|
|
|
|
|
|
|
|
$ curl https://raw.githubusercontent.com/wuchong/my-flink-project/master/src/main/resources/UserBehavior.csv > UserBehavior.csv
|
|
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
这里是否使用 curl 命令下载数据并不重要,你也可以使用 wget 命令或者直接访问链接下载数据。关键是,**将数据文件保存到项目的 `resources` 目录下**,方便应用程序访问。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#### 编写程序
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#### 创建模拟数据源
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
我们先创建一个 `UserBehavior` 的 POJO 类(所有成员变量声明成`public`便是POJO类),强类型化后能方便后续的处理。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
```java
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
|
|
* 用户行为数据结构
|
|
|
|
|
|
|
|
**/
|
|
|
|
|
|
|
|
public static class UserBehavior {
|
|
|
|
|
|
|
|
public long userId; // 用户ID
|
|
|
|
|
|
|
|
public long itemId; // 商品ID
|
|
|
|
|
|
|
|
public int categoryId; // 商品类目ID
|
|
|
|
|
|
|
|
public String behavior; // 用户行为, 包括("pv", "buy", "cart", "fav")
|
|
|
|
|
|
|
|
public long timestamp; // 行为发生的时间戳,单位秒
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
接下来我们就可以创建一个 `PojoCsvInputFormat` 了, 这是一个读取 csv 文件并将每一行转成指定 POJO
|
|
|
|
|
|
|
|
类型(在我们案例中是 `UserBehavior`)的输入器。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
```java
|
|
|
|
|
|
|
|
// UserBehavior.csv 的本地文件路径
|
|
|
|
|
|
|
|
URL fileUrl = HotItems2.class.getClassLoader().getResource("UserBehavior.csv");
|
|
|
|
|
|
|
|
Path filePath = Path.fromLocalFile(new File(fileUrl.toURI()));
|
|
|
|
|
|
|
|
// 抽取 UserBehavior 的 TypeInformation,是一个 PojoTypeInfo
|
|
|
|
|
|
|
|
PojoTypeInfo<UserBehavior> pojoType = (PojoTypeInfo<UserBehavior>) TypeExtractor.createTypeInfo(UserBehavior.class);
|
|
|
|
|
|
|
|
// 由于 Java 反射抽取出的字段顺序是不确定的,需要显式指定下文件中字段的顺序
|
|
|
|
|
|
|
|
String[] fieldOrder = new String[]{"userId", "itemId", "categoryId", "behavior", "timestamp"};
|
|
|
|
|
|
|
|
// 创建 PojoCsvInputFormat
|
|
|
|
|
|
|
|
PojoCsvInputFormat<UserBehavior> csvInput = new PojoCsvInputFormat<>(filePath, pojoType, fieldOrder);
|
|
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
下一步我们用 `PojoCsvInputFormat` 创建输入源。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
```java
|
|
|
|
|
|
|
|
DataStream<UserBehavior> dataSource = env.createInput(csvInput, pojoType);
|
|
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
这就创建了一个 `UserBehavior` 类型的 `DataStream`。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#### EventTime与Watermark
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
当我们说“统计过去一小时内点击量”,这里的“一小时”是指什么呢? 在 Flink 中它可以是指 ProcessingTime ,也可以是 EventTime,由用户决定。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
- **ProcessingTime**:**事件被处理的时间**。也就是由机器的系统时间来决定
|
|
|
|
|
|
|
|
- **EventTime**:**事件发生的时间**。一般就是数据本身携带的时间
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
在本案例中,我们需要统计业务时间上的每小时的点击量,所以要基于 EventTime 来处理。那么如果让 Flink 按照我们想要的业务时间来处理呢?这里主要有两件事情要做:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
- 告诉 Flink 我们现在按照 EventTime 模式进行处理,Flink 默认使用 ProcessingTime 处理,所以我们要显式设置下。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
```java
|
|
|
|
|
|
|
|
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
|
|
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
- 指定如何获得业务时间,以及生成 Watermark。Watermark 是用来追踪业务事件的概念,可以理解成 EventTime 世界中的时钟,用来指示当前处理到什么时刻的数据了。由于我们的数据源的数据已经经过整理,没有乱序,即事件的时间戳是单调递增的,所以可以将每条数据的业务时间就当做 Watermark。这里我们用 `AscendingTimestampExtractor` 来实现时间戳的抽取和 Watermark 的生成。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
**注意**:真实业务场景一般都是存在乱序的,所以一般使用 `BoundedOutOfOrdernessTimestampExtractor`。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
```java
|
|
|
|
|
|
|
|
DataStream<UserBehavior> timedData = dataSource
|
|
|
|
|
|
|
|
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() {
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
|
|
|
public long extractAscendingTimestamp(UserBehavior userBehavior) {
|
|
|
|
|
|
|
|
// 原始数据单位秒,将其转成毫秒
|
|
|
|
|
|
|
|
return userBehavior.timestamp * 1000;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
这样我们就得到了一个带有时间标记的数据流了,后面就能做一些窗口的操作。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#### 过滤出点击事件
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
在开始窗口操作之前,先回顾下需求“每隔5分钟输出过去一小时内点击量最多的前 N 个商品”。由于原始数据中存在点击、加购、购买、收藏各种行为的数据,但是我们只需要统计点击量,所以先使用 `FilterFunction` 将点击行为数据过滤出来。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
```java
|
|
|
|
|
|
|
|
DataStream<UserBehavior> pvData = timedData
|
|
|
|
|
|
|
|
.filter(new FilterFunction<UserBehavior>() {
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
|
|
|
public boolean filter(UserBehavior userBehavior) throws Exception {
|
|
|
|
|
|
|
|
// 过滤出只有点击的数据
|
|
|
|
|
|
|
|
return userBehavior.behavior.equals("pv");
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#### 窗口统计点击量
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
由于要每隔5分钟统计一次最近一小时每个商品的点击量,所以窗口大小是一小时,每隔5分钟滑动一次。即分别要统计 [09:00, 10:00), [09:05, 10:05), [09:10, 10:10)… 等窗口的商品点击量。是一个常见的滑动窗口需求(Sliding Window)。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
```java
|
|
|
|
|
|
|
|
DataStream<ItemViewCount> windowedData = pvData
|
|
|
|
|
|
|
|
// 对商品进行分组
|
|
|
|
|
|
|
|
.keyBy("itemId")
|
|
|
|
|
|
|
|
// 对每个商品做滑动窗口(1小时窗口,5分钟滑动一次)
|
|
|
|
|
|
|
|
.timeWindow(Time.minutes(60), Time.minutes(5))
|
|
|
|
|
|
|
|
// 做增量的聚合操作,它能使用AggregateFunction提前聚合掉数据,减少 state 的存储压力
|
|
|
|
|
|
|
|
.aggregate(new CountAgg(), new WindowResultFunction());
|
|
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
**CountAgg**
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
这里的`CountAgg`实现了`AggregateFunction`接口,功能是统计窗口中的条数,即遇到一条数据就加一。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
```java
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
|
|
* COUNT 统计的聚合函数实现,每出现一条记录加一
|
|
|
|
|
|
|
|
**/
|
|
|
|
|
|
|
|
public static class CountAgg implements AggregateFunction<UserBehavior, Long, Long> {
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
|
|
|
public Long createAccumulator() {
|
|
|
|
|
|
|
|
return 0L;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
|
|
|
public Long add(UserBehavior userBehavior, Long acc) {
|
|
|
|
|
|
|
|
return acc + 1;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
|
|
|
public Long getResult(Long acc) {
|
|
|
|
|
|
|
|
return acc;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
|
|
|
public Long merge(Long acc1, Long acc2) {
|
|
|
|
|
|
|
|
return acc1 + acc2;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
**WindowFunction**
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
`.aggregate(AggregateFunction af, WindowFunction wf)` 的第二个参数`WindowFunction`将每个 key每个窗口聚合后的结果带上其他信息进行输出。这里实现的`WindowResultFunction`将主键商品ID,窗口,点击量封装成了`ItemViewCount`进行输出。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
```java
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
|
|
* 用于输出窗口的结果
|
|
|
|
|
|
|
|
**/
|
|
|
|
|
|
|
|
public static class WindowResultFunction implements WindowFunction<Long, ItemViewCount, Tuple, TimeWindow> {
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
|
|
|
public void apply(
|
|
|
|
|
|
|
|
Tuple key, // 窗口的主键,即 itemId
|
|
|
|
|
|
|
|
TimeWindow window, // 窗口
|
|
|
|
|
|
|
|
Iterable<Long> aggregateResult, // 聚合函数的结果,即 count 值
|
|
|
|
|
|
|
|
Collector<ItemViewCount> collector // 输出类型为 ItemViewCount
|
|
|
|
|
|
|
|
) throws Exception {
|
|
|
|
|
|
|
|
Long itemId = ((Tuple1<Long>) key).f0;
|
|
|
|
|
|
|
|
Long count = aggregateResult.iterator().next();
|
|
|
|
|
|
|
|
collector.collect(ItemViewCount.of(itemId, window.getEnd(), count));
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
|
|
* 商品点击量(窗口操作的输出类型)
|
|
|
|
|
|
|
|
**/
|
|
|
|
|
|
|
|
public static class ItemViewCount {
|
|
|
|
|
|
|
|
public long itemId; // 商品ID
|
|
|
|
|
|
|
|
public long windowEnd; // 窗口结束时间戳
|
|
|
|
|
|
|
|
public long viewCount; // 商品的点击量
|
|
|
|
|
|
|
|
public static ItemViewCount of(long itemId, long windowEnd, long viewCount) {
|
|
|
|
|
|
|
|
ItemViewCount result = new ItemViewCount();
|
|
|
|
|
|
|
|
result.itemId = itemId;
|
|
|
|
|
|
|
|
result.windowEnd = windowEnd;
|
|
|
|
|
|
|
|
result.viewCount = viewCount;
|
|
|
|
|
|
|
|
return result;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
现在我们得到了每个商品在每个窗口的点击量的数据流。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#### TopN计算最热门商品
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
为了统计每个窗口下最热门的商品,我们需要再次按窗口进行分组,这里根据`ItemViewCount`中的`windowEnd`进行`keyBy()`操作。然后使用 `ProcessFunction` 实现一个自定义的 TopN 函数 `TopNHotItems` 来计算点击量排名前3名的商品,并将排名结果格式化成字符串,便于后续输出。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
```java
|
|
|
|
|
|
|
|
DataStream<String> topItems = windowedData
|
|
|
|
|
|
|
|
.keyBy("windowEnd")
|
|
|
|
|
|
|
|
.process(new TopNHotItems(3)); // 求点击量前3名的商品
|
|
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
`ProcessFunction` 是 Flink 提供的一个 low-level API,用于实现更高级的功能。它主要提供了定时器 timer 的功能(支持EventTime或ProcessingTime)。本案例中我们将利用 timer 来判断何时**收齐**了某个 window 下所有商品的点击量数据。由于 Watermark 的进度是全局的,
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
在 `processElement` 方法中,每当收到一条数据(`ItemViewCount`),我们就注册一个 `windowEnd+1` 的定时器(Flink 框架会自动忽略同一时间的重复注册)。`windowEnd+1` 的定时器被触发时,意味着收到了`windowEnd+1`的 Watermark,即收齐了该`windowEnd`下的所有商品窗口统计值。我们在 `onTimer()` 中处理将收集的所有商品及点击量进行排序,选出 TopN,并将排名信息格式化成字符串后进行输出。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
这里我们还使用了 `ListState` 来存储收到的每条 `ItemViewCount` 消息,保证在发生故障时,状态数据的不丢失和一致性。`ListState` 是 Flink 提供的类似 Java `List` 接口的 State API,它集成了框架的 checkpoint 机制,自动做到了 exactly-once 的语义保证。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
```java
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
|
|
* 求某个窗口中前 N 名的热门点击商品,key 为窗口时间戳,输出为 TopN 的结果字符串
|
|
|
|
|
|
|
|
**/
|
|
|
|
|
|
|
|
public static class TopNHotItems extends KeyedProcessFunction<Tuple, ItemViewCount, String> {
|
|
|
|
|
|
|
|
private final int topSize;
|
|
|
|
|
|
|
|
public TopNHotItems(int topSize) {
|
|
|
|
|
|
|
|
this.topSize = topSize;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// 用于存储商品与点击数的状态,待收齐同一个窗口的数据后,再触发 TopN 计算
|
|
|
|
|
|
|
|
private ListState<ItemViewCount> itemState;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
|
|
|
public void open(Configuration parameters) throws Exception {
|
|
|
|
|
|
|
|
super.open(parameters);
|
|
|
|
|
|
|
|
// 状态的注册
|
|
|
|
|
|
|
|
ListStateDescriptor<ItemViewCount> itemsStateDesc = new ListStateDescriptor<>("itemState-state", ItemViewCount.class);
|
|
|
|
|
|
|
|
itemState = getRuntimeContext().getListState(itemsStateDesc);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
|
|
|
public void processElement(ItemViewCount input, Context context, Collector<String> collector) throws Exception {
|
|
|
|
|
|
|
|
// 每条数据都保存到状态中
|
|
|
|
|
|
|
|
itemState.add(input);
|
|
|
|
|
|
|
|
// 注册 windowEnd+1 的 EventTime Timer, 当触发时,说明收齐了属于windowEnd窗口的所有商品数据
|
|
|
|
|
|
|
|
context.timerService().registerEventTimeTimer(input.windowEnd + 1);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
|
|
|
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
|
|
|
|
|
|
|
|
// 获取收到的所有商品点击量
|
|
|
|
|
|
|
|
List<ItemViewCount> allItems = new ArrayList<>();
|
|
|
|
|
|
|
|
for (ItemViewCount item : itemState.get()) {
|
|
|
|
|
|
|
|
allItems.add(item);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
// 提前清除状态中的数据,释放空间
|
|
|
|
|
|
|
|
itemState.clear();
|
|
|
|
|
|
|
|
// 按照点击量从大到小排序
|
|
|
|
|
|
|
|
allItems.sort(new Comparator<ItemViewCount>() {
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
|
|
|
public int compare(ItemViewCount o1, ItemViewCount o2) {
|
|
|
|
|
|
|
|
return (int) (o2.viewCount - o1.viewCount);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// 将排名信息格式化成 String, 便于打印
|
|
|
|
|
|
|
|
StringBuilder result = new StringBuilder();
|
|
|
|
|
|
|
|
result.append("====================================\n");
|
|
|
|
|
|
|
|
result.append("时间: ").append(new Timestamp(timestamp-1)).append("\n");
|
|
|
|
|
|
|
|
for (int i=0;i<topSize;i++) {
|
|
|
|
|
|
|
|
ItemViewCount currentItem = allItems.get(i);
|
|
|
|
|
|
|
|
// No1: 商品ID=12224 浏览量=2413
|
|
|
|
|
|
|
|
result.append("No").append(i).append(":")
|
|
|
|
|
|
|
|
.append(" 商品ID=").append(currentItem.itemId)
|
|
|
|
|
|
|
|
.append(" 浏览量=").append(currentItem.viewCount)
|
|
|
|
|
|
|
|
.append("\n");
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
result.append("====================================\n\n");
|
|
|
|
|
|
|
|
out.collect(result.toString());
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#### 打印输出
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
最后一步我们将结果打印输出到控制台,并调用`env.execute`执行任务。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
```java
|
|
|
|
|
|
|
|
topItems.print();
|
|
|
|
|
|
|
|
env.execute("Hot Items Job");
|
|
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#### 运行程序
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
直接运行 main 函数,就能看到不断输出的每个时间点的热门商品ID。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|

|
|
|
|
|