作者:马国维(黎钢)
来源:微信公众号: Flink 中文社区
出处
:https://mp.weixin.qq.com/s?__biz=MzU3Mzg4OTMyNQ==&mid=2247491453&idx=1&sn=
358f1beffd6392edeef52cb13ed06230
随着互联网和移动互联网的不断发展,各行各业都积累海量的业务数据。 而企业为了改善用户体验,提升产品在市场上的竞争力,都采取了实时化方式来处理大数据。 社交媒体的实时大屏、电商的实时推荐、城市大脑的实时交通预测、金融行业的实时反欺诈,这些产品的成功都在说明大数据处理的实时化已经成为一个势不可挡的潮流。
在实时化的大趋势下,Flink 已经成为实时计算行业的事实标准。我们看到,不光是阿里巴巴,国内外各个领域的头部厂商,都把 Flink 做为实时计算的技术底座,国内有字节跳动、腾讯、华为,国外有 Netflix、Uber 等等。
而业务实时化只是一个起点,Flink 的目标之一就是给用户提供实时离线一体化的用户体验。其实很多用户不仅需要实时的数据统计,为了确认运营或产品的策略的效果,用户同时还需要和历史(昨天,甚至是去年的同期)数据比较。而从用户的角度来看,原有的流、批独立方案存在一些痛点:
2020 年的双十一,在实时洪峰到达 40 亿的历史新高的同时,Flink 团队与 DT 团队一起推出了基于 Flink 的全链路流批一体的数仓架构,很好解决了 Lambda 的架构所带来的一系列问题:流批作业使用同一 SQL,使研发效率提升了 3~4 倍;一套引擎确保了数据口径天然一致;流批作业在同一集群运行,削峰填谷大幅提升了资源效率。
Flink 流批一体的成功,离不开 Flink 开源社区的健康蓬勃发展。从 Apache 软件基金会 2020 年度报告可以看出,在反映开源社区繁荣情况的三个关键指标上 Flink 都名列前茅:用户邮件列表活跃度,Flink 排名第一;开发者提交次数 Flink 排名第二,Github 用户访问量排名第二。这些数据并不局限于大数据领域,而是 Apache 开源基金会下属的所有项目。
2020 年也是 Blink 反哺社区的第二年,这两年我们把 Blink 在集团内积累的经验逐步贡献回社区,让 Flink 成为真正意义上的流批一体平台。我希望通过这篇文章给大家分享下这两年 Flink 在执行引擎流批融合方面做了哪些工作。同时也希望 Flink 的老用户和新朋友可以进一步了解 Flink 流批一体架构的“前世今生”。
二、流批一体的分层架构
总体来说,Flink 的核心引擎主要分为如下三层:
本文主要分享如下几个方面的内容:
三、流批一体 DataStream
如上图所示,目前 Flink 提供的 SDK 主要有三类:
目前 Table/SQL 对于流批统一的场景支持已经比较成熟,但是对于 Phyiscal SDK 来说还面临的一些挑战,主要有两个方面:
为了解决上述 Physical SDK 所面临的挑战,我们把 Unified DataStream SDK 作为 Flink 统一的 Physical SDK。这个部分主要解决两个问题:
为了解决学习和理解成本比较高的问题,最自然最简单的方案就是从 DataStream 和 DataSet 中选择一个作为 Flink 的唯一的 Physical SDK。那为什么我们选择了 DataStream 而不是 DataSet 呢?主要有两个原因:
很多对 Flink 有一定了解的用户可能会问:DataStream 是同时支持 Bounded/Unbounded 的输入,为什么我们会说:用 DataStream 无法写出一个真正生产可以用的流批一体 Application 呢?简单来说,DataStream 原本主要设计目标是给 Unbounded 场景使用的,所以导致在 Bounded 的场景下在效率、可用性、易用性方面和传统的批引擎还有一定距离。具体来说体现在如下两个方面:
先给大家看一个例子,下边是一个跑同样规模的 WordCount 的作业,DataStream 和 DataSet 的性能对比图。从这个例子可以看出,DataSet 的性能是 DataStream 将近 5 倍。
很明显,要让 DataStream 在生产中既可以支持流的场景又要支持批的场景,就一定要大幅提高 DataStream 在 Bounded 场景下效率。那么为什么 DataStream 的效率要比 DataSet 的效率低呢?
前面我们已经提到,DataStream 原本主要设计目标是给 Unbounded 的场景下使用的,而 Unounded 场景下一个主要的特点就是乱序,也就是说任何一个 DataStream 的算子无法假设处理的 Record 是按照什么顺序进行的,所以许多算子会用一个 K/V 存储来缓存这些乱序的数据,等到合适的时候再从 K/V 存储中取出这些数据进行处理并且进行输出。一般情况下,算子对 K/V 存储访问涉及大量的序列化和反序列化,同时也会引发随机磁盘 I/O;而在 DataSet 中,假设数据是有界的,也就是可以通过优化来避免随机的磁盘 I/O 访问,同时也对序列化和反序列化做了相关优化。这也是为什么用 DataSet 写的 WorkerCount 要比用 DataStream 写的 WordCount 快 5 倍主要原因。
知道到了原因,是不是要把所有的 DataStream 的算子,都重写一遍就可以了呢?理论上没问题,但是 DataStream 有大量的算子需要重写,有些算子还比较复杂,例如与 Window 相关的一系列算子。可以想象到,如果都全部重写,工程量是非常之巨大的。所以我们通过单 Key 的 BatchStateBackend 几乎完全避免了对所有算子重写,同时还得到了非常不错的效果。
对于 Flink 有一定了解的同学应该都知道,原来用 DataStream 写的 Application 都采用 Streaming 的执行模式,在这种模式下是通过 Checkpoint 的方式保持端到端的 Exactly Once 的语义,具体来说一个作业的 Sink 只有当全图的所有算子(包括 Sink 自己)都做完各自的 Snapshot 之后,Sink 才会把数据 Commit 到外部系统中,这是一个典型的依赖 Flink Checkpoint 机制的 2PC 协议。
而在 Bounded 的场景下虽然也可以采用 Streaming 的方式但是对于用户来说可能会存在一些问题:
所以在 Bounded 场景下,用户希望 Application 可以采用 Batch 执行模式,因为利用 Batch 执行的模式可以非常自然的解决上述两个问题。在 Bounded 场景下支持 Batch 的执行模式是比较简单的,但是却引入了一个非常棘手的问题——利用已有的 Sink API 无法保证端到端的 Exactly Once 语义。这是由于 Bounded 场景下是没有 Checkpoint 的,而原有 Sink 都是依赖 Checkpoint 保证端到端的 ExactlyOnce 的。同时我们不希望开发者针对 Sink 在不同模式下开发两套不同的实现,因为这样非常不利用 Flink 和其他生态的对接。
实际上,一个 Transactional 的 Sink 主要解决如下 4 个问题:
而 Flink 应该让 Sink 开发者提供 What to commit 和 How to commit,而系统应该根据不同的执行模式,选择 Where to commit 和 When to commit 来保证端到端的 Exactly Once。最终我们提出了一个全新 Unified Sink API,从而让开发者只开发一套 Sink 就可以同时运行在 Streaming 和 Batch 执行模式下。这里介绍的只是主要思路,在有限流的场景下如何保证 End to End 的一致性;如何对接 Hive、Iceberg 等外部生态,实际上还是存在一定挑战。
四、流批一体 DAG Scheduler
原来 Flink 有两种调度的模式:
总的来说,有了这两种调度方式是可以基本满足流批一体的场景需求,但是也存在着很大的改进空间,具体来说体现在三个方面:
为了既能发挥流引擎的优势,同时避免全图同时调度存在的一些短板,我们引入 Pipeline Region 的概念。Unified DAG Scheduler 允许在一个 DAG 图中,Task 之间既可以通过 Pipeline 通讯,也可以通过 Blocking 方式进行通讯。这些由 Pipeline 的数据交换方式连接的 Task 被称为一个 Pipeline Region。基于以上概念,Flink 引入 Pipeline Region 的概念,不管是流作业还是批作业,都是按照 Pipeline Region 粒度来申请资源和调度任务。细心的读者可以发现,其实原有的两种模式都是 Pipeline Region 调度的特例。
即便可以资源上满足“流”的调度模式,那么哪些任务可以采取“流”的方式调度呢?
有同学还是会担心采取“流”的调度方式容错代价会比较高,因为在“流”的调度方式下,一个 Task 发生错误,和他联通的所有 Task 都会 Fail,然后重新运行。
在 Flink 中,不同 Task 之间有两种连接方式[2],一种是 All-to-All 的连接方式,上游 Task 会和下游的所有的 Task 进行连接;一种是 PointWise 的链接方式,上游的 Task 只会和下游的部分 Task 进行连接。
如果一个作业的所有 Task 之间都是通过 All-to-All 方式进行连接,一旦采取“流”的调度方式,那么整个物理拓扑都需要同时被调度,那么确实存在 FailOver 代价比较高的问题[3]。但是在实际 Batch 作业的拓扑中,Task 之间不都是通过 All-to-All 的边连接,Batch 作业中存在的大量 Task 通过 PointWise 的边连接,通过“流”的方式调度由 PointWise 连接的 Task 连通图,在减少作业的容错成本的同时,可以提高作业的执行效率,如下图所示在,在全量的 10T TPC-DS 测试中,开启所有 PointWise 边都采用 Pipeline 的链接方式就可以让整性能有 20% 以上的性能提升。
上述只是 Schduler 提供的划分 Pipeline Region 的 4 种策略中的一种[4],实际上 Planner 可以根据实际运行场景,定制哪些 Task 之间采取 Pipeline 的传输方式,哪些 Task 之间采取 Batch 的传输方式方式。
调度的本质是给物理执行计划进行资源分配的决策过程。Pipeline Region 解决了物理执行计划确定之后,流作业和批作业可以统一按照 Pipeline Region 的粒度进行调度。对于批作业来说静态生成物理执行计划存在一些问题[5]:
为了解决静态生成物理执行存在这些问题,我们为批作业引入了自适应调度功能[6],和原来的静态物理执行计划相比,利用这个特性可以大幅提高用户资源利用率。 Adaptive Scheduler 可以根据一个 JobVertex 的上游 JobVertex 的执行情况,动态决定当前 JobVertex 的并发度。在未来,我们也可以根据上游 JobVertex 产出的数据,动态决定下游采用什么样的算子。
五、流批一体的 Shuffle 架构
Flink 是一个流批一体的平台,因此引擎对于不同的执行模式要分别提供 Streaming 和Batch 两种类型的 Shuffle。虽然 Streaming Shuffle 和 Batch Shuffle 在具体的策略上存在一定的差异,但是本质上都是为了对数据进行重新划分(re-partition),因此不同的 Shuffle 之间还存在一定的共性。所以我们的目标是提供一套统一的 Shuffle 架构,既可以满足不同 Shuffle 在策略上的定制,同时还能避免在共性需求上进行重复开发。
总体来说,Shuffle 架构可以划分成如下图所示的四个层次。流和批的 Shuffle 需求在各层上有一定差异,也有大量的共性,下边我做了一些简要分析。
大家都知道,批作业和流作业对 Shuffle 的需求是有差异的,具体可以体现在如下 3 个方面:
批作业和流作业的 Shuffle 有差异也有共性,共性主要体现在:
Unified Shuffle 架构抽象出三个组件[9]: Shuffle Master、Shuffle Reader、Shuffle Writer。Flink通过和这三个组件交互完成算子间的数据的重新划分。通过这三个组件可以满足不同Shuffle插件在具体策略上的差异:
同时,我们也为流批 Shuffle 的共性——Meta 管理、数据传输、服务部署[10]——提供了架构层面的支持,从而避免对复杂组件的重复开发。高效稳定的数据传输,是分布式系统最复杂的子系统之一,例如在传输中都要解决上下游反压、数据压缩、内存零拷贝等问题,在新的架构中只要开发一遍,就可以同时在流和批两种场景下共同使用,大大减少了开发和维护的成本。
六、流批一体的容错策略
Flink 原有容错策略是以检查点为前提的,具体来说无论是单个 Task 出现失败还是JobMaster 失败,Flink 都会按照最近的检查点重启整个作业。虽然这种策略存在一定的优化空间,但是总的来说对于流的场景是基本是接受的。目前,Flink Batch 运行模式下不会开启检查点[11],这也意味一旦出现任何错误,整个作业都要从头执行。
虽然原有策略在理论上可以保证最终一定会产出正确的结果,但是明显大多数客户都无法接受这种容错策略所付出的代价。为了解决这些问题,我们分别对 Task 和 JM 的容错都做了相应的改进。
虽然在 Batch 执行模式下没有定时的 Checkpoint,但是在 Batch 执行模式下,Flink允许 Task 之间通过 Blocking Shuffle 进行通信。对于读取 Blocking Shuffle 的 Task 发生失败之后,由于 Blocking Shuffle 中存储了这个 Task 所需要的全部数据,所以只需要重启这个 Task 以及通过 Pipeline Shuffle 与其相连的全部下游任务即可,而不需要重启整个作业。
总的来说,Pipeline Region Failover 策略和 Scheduler 在进行正常调度的时候一样,都是把一个 DAG 拆分成由若干 Pipeline shuffle 连接的一些 Pipeline Region,每当一个 Task 发生 FailOver 的时候,只会重启这个 Task 所在的 Region 即可。
JM 是一个作业的控制中心,包含了作业的各种执行状态。Flink 利用这些状态对任务进行调度和部署。一旦 JM 发生错误之后,这些状态将会全部丢失。如果没有这些信息,即便所有的工作节点都没有发生故障,新 JM 仍然无法继续调度原来的作业。例如,由于任务的结束信息都已丢失,一个任务结束之后,新 JM 无法判断现有的状态是否满足调度下游任务的条件——所有的输入数据都已经产生。
从上边的分析可以看出,JM Failover 的关键就是如何让一个 JM“恢复记忆”。在 VVR[12] 中我们通过基于 Operation Log 机制恢复 JM 的关键状态。
细心的同学可能已经发现了,虽然这两个改进的出发点是为了批的场景,但是实际上对于流的作业容也同样有效。上边只是简要的介绍了两种容错策略的思路,实际上还有很多值得思考的内容。例如 Blocking 上游数据丢失了我们应该如何处理?JM 中有哪些关键的状态需要恢复?
七、未来展望
为了提供比现在更快、更稳的用户体验,我们已经开始了下一代流式架构的研发;Flink在流批一体的场景下得到了越来越多用户的认可,但是我们也知道业界还有很多高水平传统大数据系统值得我们学习。最后我也希望感兴趣的小伙伴可以加入我们,一起打造一个具有完美用户体验的流批一体大数据计算引擎。
注释:
[1] Streaming 和 Batch 是两种执行模式和语义无关,Streaming 执行模式可以简单的理解为,Task 之间采用 Pipeline 的 Shuffle;Batch 执行模式可以简单的理解为,Task 之间采用 Blocking 的 Shuffle 模式。
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/runtime/jobgraph/DistributionPattern.html
[3] 我们正在开发 Adaptive 的 Shuffle 模式,利用这种模式可以避免”纯”Pipeline的方式引发的容错代价过高的问题。
[4] https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/graph/GlobalDataExchangeMode.html
[5] 对于流作业来说,静态物理执行计划也有和批类似的问题,我们提供了一个AutoPilot的系统来动态修改物理执行计划。由于AutoPilot 属于独立的服务,不属于执行引擎这里就不展开赘述了。
[6] 由于时间规划的原因,这个功能暂时只存在我们的商业化版本的执行引擎VVR中
[7] 在一些情况下,批Shuffle Service也会和计算节点部署在一起。例如,在Flink Session的模式下,虽然Shuffle Service和计算部署在一起有一定的稳定性代价,但是对部分用户来说这种部署模式是在成本和稳定性之间权衡之下的一个结果。所以在一定程度上,流批Shuffle在部署方面也是有共性的,只是不是完全相同而已。
[8] 把批作业的计算和Shuffle部署在一个节点内所存在的问题:资源利用率低、成本高。如果没有计算任务继续部署该节点上那么这个节点上计算资源就会被浪费掉,计算资源提早释放也会节省用户成本;性能无法达到最优 由于一个节点只能看到部分的Shuffle数据,因此一个Reduce需要从n个节点上拉取自己的数据,这会引发大量的随机IO读,这样大量的随机读IO会大大降低作业性能;稳定性 一旦结点挂掉,整个节点所负责的Shuffle数据就会丢失,然后就会触发作业重新计算,这种重新计算的代价都是比较高的。(Task中包含用户代码,所以此种结点down掉的概率会大于存储计算分离情况下的Shuffle结点。)
[9] 由于历史原因,大家在读Flink代码的时候看到不是Reader和Writer,而是ResultPartion/InputGate。这里用Reader和Writer是为了降低刚接触Flink的同学的理解门槛。
[10] 部署为什么也算共性,可以参考[7]。
[11] 虽然理论上批作业可以支持检查点,但是在批的场景下,开启原生的流式Checkpoint成本是比较高的的。当然这也不是完全排除未来可能会发现比较合适场景;
[12] VVR是Flink商业产品的执行引擎,由于时间规划的原因,这个功能暂时还没有回馈给Flink社区
作者:马国维(黎钢)
来源:微信公众号: Flink 中文社区
出处
:https://mp.weixin.qq.com/s?__biz=MzU3Mzg4OTMyNQ==&mid=2247491453&idx=1&sn=
358f1beffd6392edeef52cb13ed06230