0%

Flink Advanced

undefined

Flink进阶的知识点和文章整理

Flink的应用场景

前段时间被人问了一个很简单的问题,那就是Flink应该使用在什么应用场景下

乍一听很简单的问题,我却没法提供让我自己满意的回答,只是说需要用到实时计算的地方都刻意用Flink,表述的不够明确,OK,这边有空来用专业的术语和详细的例子总结一下

  • 事件驱动型应用
  • 数据分析型应用
  • 数据管道型应用

事件驱动型应用场景

社交领域

比如在twitter上,我们点击关注某人,点击之后,点击作为一个事件,以数据的方式被接受计算,被关注者的粉丝数量+1,关注者的关注数量+1,两个数据收到了影响,这个就叫事件驱动型应用。

电商

当刷单者疯狂买东西然后每天给很多个好评,持续很多天的时候,这个事件作为一条数据被抛出警告,提醒淘宝平台这个账号有问题,可能在刷单,这也是事件驱动。

如果某个商家收到来自不同用户的大量投诉,那么可能是店出现了问题,同样提醒淘宝平台。

在淘宝的推荐系统中

微信截图_20201114154629.png

购买东西作为事件,可以看到会实时触发淘宝的推荐系统进行类似推荐。

金融行业

比如说,有人在银行ATM机里面通过摄像头被算法识别到遮挡面部或者遮挡摄像头等特征的时候,会触发一些事件,比如说通知安保人员等等。

事件驱动型应用是一类具有状态的应用,该应用会根据事件流中的时间触发计算,更新状态或进行外部系统操作。事件驱动型应用常见于实时计算业务中,比如上面的:实时推荐系统,金融反欺诈系统,实时规则预警系统等。

上述的这种场景毫无疑问是优先于Flink出现而出现的,那么我们来看一下原来我们是如何解决这些问题的

微信截图_20201114200806.png

这个是我们最开始的解决此类问题的架构

对于一些数据规模是很大,实时性要求不是很高的场景来说,效果是很好的,但是随着数量的不断增加,尤其是数据爆炸时代背景下的海量数据,以关系型数据为例,这里我们需要使用分库分表的方式来支持海量数据,这种方式由于数据库的各种限制,比如说事务一致性的控制,数据备份机制,全局索引等等数据库的实际的架构方面的原因,随着数据量的增加,写入和查询的性能会受到很大的挑战,尤其在实时性要求更高的场景,我们需要更优秀的架构。

微信截图_20201114205428.png

这个是引入了Flink之后的架构,在一个完整的时间驱动型架构中往往会涉及到已有数据实体的存储,实体之间关系的维护,统计报表的持久化等需求,所以我们在这个架构中仍然需要用到传统关系型数据库,那么在这个架构中,触发计算的部分,我们交给Apache Flink处理,计算结果既可以存储到支持海量数据的Hadoop生态中的HDFS上,我们也可以将计算结果和统计信息存储在传统数据库中供实时查询。

那么这种解决方案有什么优势呢?

实时性。为了数据价值最大化,从上面的场景中,我们可以很明显的体会到数据实时反馈的重要性,Flink为了实时计算处理,进行了针对实时流计算的定制特性,比如丰富的状态支持,多窗口语义支持,灵活的Timer和Trigger机制,强大的CEP机制等等等,同时进行了大量的性能优化,这里限于篇幅不能一一列举,后面有机会会再介绍。

数据分析型应用

So,怎样的应用才是数据分析型应用呢?

数据分析型应用是一个非常宽泛的范围,凡是数据分析型应用都是从原始数据中提取有价值的信息和指标

传统的数据分析方案是一次查询,获得一次查询 获得一次结果,为了实现始终得到最新的结果,需要自己定义触发器(到了某个设定的条件,自动执行查询)也就是我们说的Batch方式的查询方式。

而得益于Timer和State的支持,Flink能够持续查询,Flink的Timer能够持续触发计算,State能够保存上次结果增加计算,一次查询能够源源不断的获得最新的结果。

为了体现Flink的优秀性能,我举一些特定例子

比如说,几乎要求零延迟的天猫/京东双十一大屏

微信截图_20201114230411.png

这个统计结果几乎是要求零延迟展示给大家的,现在几家巨头电商的双十一计算逻辑都是在Flink中实现的,侧面验证了Flink在快和处理数据量巨大这两点上的可靠性。

实际上更多的业务可能既既是事件驱动型应用又是数据分析型应用,如果一定要定义这两种区别的话,我们可以从处理目的上来定义,事件驱动型应用是数据最后会直接派发新动作,比如上面的ATM应用检测到不正常动作就触发报警。而数据分析我们只是产生数据,让决策者根据结果数据为现实事件做决定。

数据管道型应用

我们在采集数据的时候往往会根据数据的特性(数据量的大小,数据的结构)采用不同的数据库来存储数据,但是我们在使用数据的时候如果需要进行流处理,那么我们要针对不用的数据库开发的数据库开发上面所述的不同的触发器,大大增加开发成本。

现在Flink已经集成了CDC功能,正在完善对各个流行的数据库的解析模块,随着时间的推移,最后会成为一个完美的数据管道。

流处理的历史

这里把流处理的历史放在了第二部分,上面花了相当的篇幅来描述了什么是Flink(流处理),先弄懂了这个,然后再介绍历史,应该会好一些。

为了同时获得准确性和低延迟,为了让当时的数据处理系统,可以同时提供快速和准确的结果,人们设计了所谓的lambda架构。

undefined

lambda架构增强了传统的批处理架构,其“快速层”(speed layer)由低延迟的流处理器来支持。数据到达之后由流处理器提取出来,并写入批处理存储。流处理器近乎实时地计算近似结果并将它们写入“快速表”(speed table)。批处理器定期处理批量存储中的数据,将准确的结果写入批处理表,并从速度表中删除相应的不准确结果。应用程序会合并快速表中的近似结果和批处理表中的准确结果,然后消费最终的结果。

lambda架构现在已经不再是最先进的,但仍在许多地方使用。该体系结构的最初目标是改善原始批处理分析体系结构的高延迟。但是,它有一些明显的缺点。首先,它需要对一个应用程序,做出两个语义上等效的逻辑实现,用于两个独立的、具有不同API的处理系统。其次,流处理器计算的结果只是近似的。第三,lambda架构很难建立和维护。

通过在第一代基础上进行改进,下一代分布式开源流处理器(2013)提供了更好的故障保证,并确保在发生故障时,每个输入记录仅对结果产生一次影响(exactly -once)。此外,编程API从相当低级的操作符接口演变为高级API。但是,一些改进(例如更高的吞吐量和更好的故障保证)是以将处理延迟从毫秒增加到几秒为代价的。此外,结果仍然取决于到达事件的时间和顺序。

第三代分布式开源流处理器(2015)解决了结果对到达事件的时间和顺序的依赖性。结合精确一次(exactly-once)的故障语义,这一代系统是第一个具有计算一致性和准确结果的开源流处理器。通过基于实际数据来计算结果(“重演”数据),这些系统还能够以与“实时”数据相同的方式处理历史数据。另一个改进是解决了延迟/吞吐量无法同时保证的问题。先前的流处理器仅能提供高吞吐量或者低延迟(其中之一),而第三代系统能够同时提供这两个特性。这一代的流处理器使得lambda架构过时了。当然,这一代流处理以flink为代表。

除了目前讨论的特性,例如容错、性能和结果准确性之外,流处理器还不断添加新的操作功能,例如高可用性设置,与资源管理器(如YARN或Kubernetes)的紧密集成,以及能够动态扩展流应用程序。其他功能包括:支持升级应用程序代码,或将作业迁移到其他群集或新版本的流处理器,而不会丢失当前状态。

Flink基础:流处理基础

数据流编程简介

数据流图

之前做了一个纯Java分析项目,大大加深了我对流处理的认识,只要是流处理,总是会包含的一些特性 ==> 我们把这些特性称之为流处理特性。

顾名思义,数据流程序描述了数据如何在算子之间流动。数据流程序通常表示为有向图,其中节点称为算子用来表示计算,边表示数据之间的依赖性。算子是数据流程序的基本功能单元。他们从输入消耗数据,对它们执行计算,并生成数据输出用于进一步处理。一个数据流图必须至少有一个数据源和一个数据接收器。

undefined

上面的图只能叫逻辑流图,没有考虑在实际执行过程中的物理机的问题,下面是物理数据流图

undefined

在物理数据流图中,节点是任务。“Extract hashtags”和“Count”算子有两个并行算子任务,每个算子任务对输入数据的子集执行计算。

数据并行与任务并行

我们可以以不同方式利用数据流图中的并行性。第一,我们可以对输入数据进行分区,并在数据的子集上并行执行具有相同算子的任务并行。这种类型的并行性被称为数据并行性。数据并行是有用的,因为它允许处理大量数据,并将计算分散到不同的计算节点上。第二,我们可以将不同的算子在相同或不同的数据上并行执行。这种并行性称为任务并行性。使用任务并行性,我们可以更好地利用计算资源。

数据交换策略

undefined

  • 前向策略将数据从一个任务发送到接收任务。如果两个任务都位于同一台物理计算机上(这通常由任务调度器确保),这种交换策略可以避免网络通信。
  • 广播策略将所有数据发送到算子的所有的并行任务上面去。因为这种策略会复制数据和涉及网络通信,所以代价相当昂贵。
  • 基于键控的策略通过Key值(键)对数据进行分区保证具有相同Key的数据将由同一任务处理。在图2-2中,输出“Extract hashtags”算子使用键来分区(hashtag),以便count算子的任务可以正确计算每个#标签的出现次数。
  • 随机策略统一将数据分配到算子的任务中去,以便均匀地将负载分配到不同的计算任务。

并行处理流数据

首先,让我们定义术语数据流:数据流是一个可能无限的事件序列。

数据流中的事件可以表示监控数据,传感器测量数据,信用卡交易数据,气象站观测数据,在线用户交互数据,网络搜索数据等。在本节中,我们将学习如何并行处理无限流,使用数据流编程范式。

延迟和吞吐量

流处理程序不同与批处理程序。在评估性能时,要求也有所不同。对于批处理程序,我们通常关心一个作业的总的执行时间,或我们的处理引擎读取输入所需的时间,执行计算,并回写结果。由于流处理程序是连续运行的,输入可能是无界的,所以数据流处理中没有总执行时间的概念。 相反,流处理程序必须尽可能快的提供输入ew数据的计算结果。我们使用延迟和吞吐量来表征流处理的性能要求。

延迟

延迟表示处理事件所需的时间

在数据流中,延迟是以时间为单位测量的,例如毫秒。根据应用程序,我们可能会关心平均延迟,最大延迟或百分位延迟。例如,平均延迟值为10ms意味着处理事件的平均时间在10毫秒内。或者,延迟值为95%,10ms表示95%的事件在10ms内处理完毕。平均值隐藏了处理延迟的真实分布,可能会让人难以发现问题。

确保低延迟对于许多流应用程序来说至关重要,例如欺诈检测,系统警报,网络监控和提供具有严格服务水平协议的服务。低延迟是流处理的关键特性,它实现了我们所谓的实时应用程序。像Apache Flink这样的现代流处理器可以提供低至几毫秒的延迟。

吞吐量

吞吐量是衡量系统处理能力的指标,也就是处理速率。

数据流上的操作

流处理引擎通畅提供的一组内置操作:

摄取(ingest),转换(transform)和输出流(output)

操作可以是无状态的或有状态的。无状态操作不保持任何内部状态。也就是说,事件的处理不依赖于过去看到的任何事件,也没有保留历史。 无状态操作很容易并行化,因为事件可以彼此独立地处理,也独立于事件到达的顺序(和事件到达顺序没有关系)。 而且,在失败的情况下,无状态操作可以是简单的重新启动并从中断处继续处理。相反, 有状态操作可能会维护之前收到的事件的信息。此状态可以通过传入事件更新,也可以用于未来事件的处理逻辑。有状态的流 处理应用程序更难以并行化和以容错的方式来运行,因为状态需要有效的进行分区和在发生故障的情况下可靠地恢复。

数据摄入和数据吞吐量

数据摄取和数据出口操作允许流处理程序与外部系统通信。数据摄取是操作从外部源获取原始数据并将其转换为其他格式(ETL)。实现数据提取逻辑的运算符被称为数据源。数据源可以从TCP Socket,文件,Kafka Topic或传感器数据接口中提取数据。数据出口是以适合消费的形式产出到外部系统。执行数据出口的运算符称为数据接收器,包括文件,数据库,消息队列和监控接口。

转换算子

undefined

转换算子是单遍处理算子,碰到一个事件处理一个事件。这些操作在使用后会消费一个事件,然后对事件数据做一些转换,产生一个新的输出流。转换逻辑可以集成在 操作符中或由UDF函数提供。

操作符可以接受多个输入流并产生多个输出流。他们还可以通过修改数据流图的结构要么将流分成多个流,要么将流合并为一条流。

滚动聚合

滚动聚合是一种聚合,例如sum,minimum和maximum,为每个输入事件不断更新。 聚合操作是有状态的,并将当前状态与传入事件一起计算以产生更新的聚合值。请注意能够有效地将当前状态与事件相结合 产生单个值,聚合函数必须是关联的和可交换的。否则,操作符必须存储完整的流数据历史。

undefined

窗口操作符

转换和滚动聚合一次处理一个事件产生输出事件并可能更新状态。但是,有些操作必须收集并缓冲数据以计算其结果。 例如,考虑不同流之间的连接或整体聚合这样的操作,例如中值函数。为了在无界流上高效运行这些操作符,我们需要限制 这些操作维护的数据量。在本节中,我们将讨论窗口操作,提供此服务。

窗口还可以在语义上实现关于流的比较复杂的查询。我们已经看到了滚动聚合的方式,以聚合值编码整个流的历史数据来为每个事件提供低延迟的结果。 但如果我们只对最近的数据感兴趣的话会怎样?考虑给司机提供实时交通信息的应用程序。这个程序可以使他们避免拥挤的路线。在这种场景下,你想知道某个位置在最近几分钟内是否有事故发生。 另一方面,了解所有发生过的事故在这个应用场景下并没有什么卵用。更重要的是,通过将流历史缩减为单一聚合值,我们将丢失这段时间内数据的变化。例如,我们可能想知道每5分钟有多少车辆穿过 某个路口。

窗口操作不断从无限事件流中创建有限的事件集,好让我们执行有限集的计算。通常会基于数据属性或基于时间的窗口来分配事件。 要正确定义窗口运算符语义,我们需要确定如何给窗口分配事件以及对窗口中的元素进行求值的频率是什么样的。 窗口的行为由一组策略定义。窗口策略决定何时创建新的窗口以及要分配的事件属于哪个窗口,以及何时对窗口中的元素进行求值。 而窗口的求值基于触发条件。一旦触发条件得到满足,窗口的内容将会被发送到求值函数,求值函数会将计算逻辑应用于窗口中的元素。 求值函数可以是sum或minimal或自定义的聚合函数。 求值策略可以根据时间或者数据属性计算(例如,在过去五秒内收到的事件或者最近的一百个事件等等)。 接下来,我们描述常见窗口类型的语义。

  • 滚动窗口是将事件分配到固定大小的不重叠的窗口中。当通过窗口的结尾时,全部事件被发送到求值函数进行处理。基于计数的滚动窗口定义了在触发求值之前需要收集多少事件。下图显示了一个基于计数的翻滚窗口,每四个元素一个窗口。基于时间的滚动窗口定义一个时间间隔,包含在此时间间隔内的事件。下下图显示了基于时间的滚动窗口,将事件收集到窗口中每10分钟触发一次计算。

undefined

undefined

  • 滑动窗口将事件分配到固定大小的重叠的窗口中去。因此,事件可能属于多个桶。我们通过提供窗口的长度和滑动距离来定义滑动窗口。滑动距离定义了创建新窗口的间隔。基于滑动计数的窗口,下图长度为四个事件,三个为滑动距离。

undefined

  • 会话窗口在常见的真实场景中很有用,一些场景既不能使用滚动窗口也不能使用滑动窗口。考虑一个分析在线用户行为的应用程序。在应用程序里,我们想把源自同一时期的用户活动或会话事件分组在一起。会话由一系列相邻时间发生的事件组成,接下来有一段时间没有活动。例如,用户在App上浏览一系列的新闻,然后关掉App,那么浏览新闻这段时间的浏览事件就是一个会话。会话窗口事先没有定义窗口的长度,而是取决于数据的实际情况,滚动窗口和滑动窗口无法应用于这个场景。相反,我们需要将同一会话中的事件分配到同一个窗口中去,而不同的会话可能窗口长度不一样。会话窗口会定义一个间隙值来区分不同的会话。间隙值的意思是:用户一段时间内不活动,就认为用户的会话结束了。下图显示了一个会话窗口。

undefined

到目前为止,所有窗口类型都是在整条流上去做窗口操作。但实际上你可能想要将一条流分流成多个逻辑流并定义并行窗口。 例如,如果我们正在接收来自不同传感器的测量结果,那么可能想要在做窗口计算之前按传感器ID对流进行分流操作。 在并行窗口中,每条流都独立于其他流,然后应用了窗口逻辑。下图显示了一个基于计数的长度为2的并行滚动窗口,根据事件颜色分流。

undefined

在流处理中,窗口操作与两个主要概念密切相关:时间语义和状态管理。时间也许是流处理最重要的方面。即使低延迟是流处理的一个有吸引力的特性,它的真正价值不仅仅是快速分析。真实世界的系统,网络和通信渠道远非完美,流数据经常被推迟或无序(乱序)到达。理解如何在这种条件下提供准确和确定的结果是至关重要的。 更重要的是,流处理程序可以按原样处理事件制作的也应该能够处理相同的历史事件方式,从而实现离线分析甚至时间旅行分析。 当然,前提是我们的系统可以保存状态,因为可能有故障发生。到目前为止,我们看到的所有窗口类型在产生结果前都需要保存之前的数据。实际上,如果我们想计算任何指标,即使是简单的计数,我们也需要保存状态。考虑到流处理程序可能会运行几天,几个月甚至几年,我们需要确保状态可以在发生故障的情况下可靠地恢复。 并且即使程序崩溃,我们的系统也能保证计算出准确的结果。本章,我们将在流处理应用可能发生故障的语境下,深入探讨时间和状态的概念。

时间语义

在流处理中一分钟代表什么?

在处理可能是无限的事件流(包含了连续到达的事件),时间成为流处理程序的核心方面。假设我们想要连续的计算结果,可能每分钟就要计算一次。在我们的流处理程序上下文中,一分钟的意思是什么?

考虑一个程序需要分析一款移动端的在线游戏的用户所产生的事件流。游戏中的用户分了组,而应用程序将收集每个小组的活动数据,基于小组中的成员多快达到了游戏设定的目标,然后在游戏中提供奖励。例如额外的生命和用户升级。例如,如果一个小组中的所有用户在一分钟之内都弹出了500个泡泡,他们将升一级。Alice是一个勤奋的玩家,她在每天早晨的通勤时间玩游戏。问题在于Alice住在柏林,并且乘地铁去上班。而柏林的地铁手机信号很差。我们设想一个这样的场景,Alice当她的手机连上网时,开始弹泡泡,然后游戏会将数据发送到我们编写的应用程序中,这时地铁突然进入了隧道,她的手机也断网了。Alice还在玩这个游戏,而产生的事件将会缓存在手机中。当地铁离开隧道,Alice的手机又在线了,而手机中缓存的游戏事件将发送到应用程序。我们的应用程序应该如何处理这些数据?在这个场景中一分钟的意思是什么?这个一分钟应该包含Alice离线的那段时间吗?下图展示了这个问题。

undefined

在线手游是一个简单的场景,展示了应用程序的运算应该取决于事件实际发生的时间,而不是应用程序收到事件的时间。如果我们按照应用程序收到事件的时间来进行处理的话,最糟糕的后果就是,Alice和她的朋友们再也不玩这个游戏了。但是还有很多时间语义非常关键的应用程序,我们需要保证时间语义的正确性。如果我们只考虑我们在一分钟之内收到了多少数据,我们的结果会变化,因为结果取决于网络连接的速度或处理的速度。相反,定义一分钟之内的事件数量,这个一分钟应该是数据本身的时间。

在Alice的这个例子中,流处理程序可能会碰到两个不同的时间概念:处理时间和事件时间。我们将在接下来的部分,讨论这两个概念。

处理时间

处理时间是处理流的应用程序的机器的本地时钟的时间(墙上时钟)。处理时间的窗口包含了一个时间段内来到机器的所有事件。这个时间段指的是机器的墙上时钟。如下图所示,在Alice的这个例子中,处理时间窗口在Alice的手机离线的情况下,时间将会继续行走。但这个处理时间窗口将不会收集Alice的手机离线时产生的事件。

undefined

事件时间

  事件时间是流中的事件实际发生的时间。事件时间基于流中的事件所包含的时间戳。通常情况下,在事件进入流处理程序前,事件数据就已经包含了时间戳。下图展示了事件时间窗口将会正确的将事件分发到窗口中去。可以如实反应事情是怎么发生的。即使事件可能存在延迟。

undefined

​ 事件时间使得计算结果的过程不需要依赖处理数据的速度。基于事件时间的操作是可以预测的,而计算结果也是确定的。无论流处理程序处理流数据的速度快或是慢,无论事件到达流处理程序的速度快或是慢,事件时间窗口的计算结果都是一样的。

  可以处理迟到的事件只是我们使用事件时间所克服的一个挑战而已。普遍存在的事件乱序问题可以使用事件时间得到解决。考虑和Alice玩同样游戏的Bob,他恰好和Alice在同一趟地铁上。Alice和Bob虽然玩的游戏一样,但他们的手机信号是不同的运营商提供的。当Alice的手机没信号时,Bob的手机依然有信号,游戏数据可以正常发送出去。

  如果使用事件时间,即使碰到了事件乱序到达的情况,我们也可以保证结果的正确性。还有,当我们在处理可以重播的流数据时,由于时间戳的确定性,我们可以快进过去。也就是说,我们可以重播一条流,然后分析历史数据,就好像流中的事件是实时发生一样。另外,我们可以快进历史数据来使我们的应用程序追上现在的事件,然后应用程序仍然是一个实时处理程序,而且业务逻辑不需要改变。

水位线

  在我们对事件时间窗口的讨论中,我们忽略了一个很重要的方面:我们应该怎样去决定何时触发事件时间窗口的计算?也就是说,在我们可以确定一个时间点之前的所有事件都已经到达之前,我们需要等待多久?我们如何知道事件是迟到的?在分布式系统无法准确预测行为的现实条件下,以及外部组件所引发的事件的延迟,以上问题并没有准确的答案。在本小节中,我们将会看到如何使用水位线来设置事件时间窗口的行为。

  水位线是全局进度的度量标准。系统可以确信在一个时间点之后,不会有早于这个时间点发生的事件到来了。本质上,水位线提供了一个逻辑时钟,这个逻辑时钟告诉系统当前的事件时间。当一个运算符接收到含有时间T的水位线时,这个运算符会认为早于时间T的发生的事件已经全部都到达了。对于事件时间窗口和乱序事件的处理,水位线非常重要。运算符一旦接收到水位线,运算符会认为一段时间内发生的所有事件都已经观察到,可以触发针对这段时间内所有事件的计算了。

  水位线提供了一种结果可信度和延时之间的妥协。激进的水位线设置可以保证低延迟,但结果的准确性不够。在这种情况下,迟到的事件有可能晚于水位线到达,我们需要编写一些代码来处理迟到事件。另一方面,如果水位线设置的过于宽松,计算的结果准确性会很高,但可能会增加流处理程序不必要的延时。

  在很多真实世界的场景里面,系统无法获得足够的知识来完美的确定水位线。在手游这个场景中,我们无法得知一个用户离线时间会有多长,他们可能正在穿越一条隧道,可能正在乘飞机,可能永远不会再玩儿了。水位线无论是用户自定义的或者是自动生成的,在一个分布式系统中追踪全局的时间进度都不是很容易。所以仅仅依靠水位线可能并不是一个很好的主意。流处理系统还需要提供一些机制来处理迟到的元素(在水位线之后到达的事件)。根据应用场景,我们可能需要把迟到事件丢弃掉,或者写到日志里,或者使用迟到事件来更新之前已经计算好的结果。

处理时间和事件时间

  大家可能会有疑问,既然事件时间已经可以解决我们的所有问题,为什么我们还要对比这两个时间概念?真相是,处理时间在很多情况下依然很有用。处理时间窗口将会带来理论上最低的延迟。因为我们不需要考虑迟到事件以及乱序事件,所以一个窗口只需要简单的缓存窗口内的数据即可,一旦机器时间超过指定的处理时间窗口的结束时间,就会触发窗口的计算。所以对于一些处理速度比结果准确性更重要的流处理程序,处理时间就派上用场了。另一个应用场景是,当我们需要在真实的时间场景下,周期性的报告结果时,同时不考虑结果的准确性。一个例子就是一个实时监控的仪表盘,负责显示当事件到达时立即聚合的结果。最后,处理时间窗口可以提供流本身数据的忠实表达,对于一些案例可能是很必要的特性。例如我们可能对观察流和对每分钟事件的计数(检测可能存在的停电状况)很感兴趣。简单的说,处理时间提供了低延迟,同时结果也取决于处理速度,并且也不能保证确定性。另一方面,事件时间保证了结果的确定性,同时还可以使我们能够处理迟到的或者乱序的事件流。

状态和持久化模型

我们现在转向另一个对于流处理程序非常重要的话题:状态。在数据处理中,状态是普遍存在的。任何稍微复杂一点的计算,都涉及到状态。为了产生计算结果,一个函数在一段时间内的一定数量的事件上来累加状态(例如,聚合计算或者模式匹配)。有状态的运算符使用输入的事件以及内部保存的状态来计算得到输出。例如,一个滚动聚合运算符需要输出这个运算符所观察到的所有事件的累加和。这个运算符将会在内部保存当前观察到的所有事件的累加和,同时每输入一个事件就更新一次累加和的计算结果。相似的,当一个运算符检测到一个“高温”事件紧接着十分钟以内检测到一个“烟雾”事件时,将会报警。直到运算符观察到一个“烟雾”事件或者十分钟的时间段已经过去,这个运算符需要在内部状态中一直保存着“高温”事件。

当我们考虑一下使用批处理系统来分析一个无界数据集时,会发现状态的重要性显而易见。在现代流处理器兴起之前,处理无界数据集的一个通常做法是将输入的事件攒成微批,然后交由批处理器来处理。当一个任务结束时,计算结果将被持久化,而所有的运算符状态就丢失了。一旦一个任务在计算下一个微批次的数据时,这个任务是无法访问上一个任务的状态的(都丢掉了)。这个问题通常使用将状态代理到外部系统(例如数据库)的方法来解决。相反,在一个连续不间断运行的流处理任务中,事件的状态是一直存在的,我们可以将状态暴露出来作为编程模型中的一等公民。当然,我们的确可以使用外部系统来管理流的状态,即使这个解决方案会带来额外的延迟。

由于流处理运算符默认处理的是无界数据流。所以我们必须要注意不要让内部状态无限的增长。为了限制状态的大小,运算符通常情况下会保存一些之前所观察到的事件流的总结或者概要。这个总结可能是一个计数值,一个累加和,或者事件流的采样,窗口的缓存操作,或者是一个自定义的数据结构,这个数据结构用来保存数据流中感兴趣的一些特性。

我们可以想象的到,支持有状态的运算符可能会碰到一些实现上的挑战:

状态管理

系统需要高效的管理状态,并保证针对状态的并发更新,不会产生竞争条件(race condition)。

状态分区

并行会带来复杂性。因为计算结果同时取决于已经保存的状态和输入的事件流。幸运的是,大多数情况下,我们可以使用Key来对状态进行分区,然后独立的管理每一个分区。例如,当我们处理一组传感器的测量事件流时,我们可以使用分区的运算符状态来针对不同的传感器独立的保存状态。

状态恢复

第三个挑战是有状态的运算符如何保证状态可以恢复,即使出现任务失败的情况,计算也是正确的。

任务失败

流任务中的运算符状态是很宝贵的,也需要抵御任务失败带来的问题。如果在任务失败的情况下,状态丢失的话,在任务恢复以后计算的结果将是不正确的。流任务会连续不断的运行很长时间,而状态可能已经收集了几天甚至几个月。在失败的情况下,重新处理所有的输入并重新生成一个丢失的状态,将会很浪费时间,开销也很大。

什么是任务失败?

  对于流中的每一个事件,一个处理任务分为以下步骤:

  (1)接收事件,并将事件存储在本地的缓存中;

  (2)可能会更新内部状态;

  (3)产生输出记录。这些步骤都能失败,而系统必须对于在失败的场景下如何处理有清晰的定义。如果任务在第一步就失败了,事件会丢失吗?如果当更新内部状态的时候任务失败,那么内部状态会在任务恢复以后更新吗?在以上这些场景中,输出是确定性的吗?

  在批处理场景下,所有的问题都不是问题。因为我们可以很方便的重新计算。所以不会有事件丢失,状态也可以得到完全恢复。在流的世界里,处理失败不是一个小问题。流系统在失败的情况下需要保证结果的准确性。接下来,我们需要看一下现代流处理系统所提供的一些保障,以及实现这些保障的机制。

结果的保证

当我们讨论保证计算的结果时,我们的意思是流处理器的内部状态需要保证一致性。也就是说我们关心的是应用程序的代码在故障恢复以后看到的状态值是什么。要注意保证应用程序状态的一致性并不是保证应用程序的输出结果的一致性。一旦输出结果被持久化,结果的准确性就很难保证了。除非持久化系统支持事务。

AT-MOST-ONCE

  当任务故障时,最简单的做法是什么都不干,既不恢复丢失的状态,也不重播丢失的事件。At-most-once语义的含义是最多处理一次事件。换句话说,事件可以被丢弃掉,也没有任何操作来保证结果的准确性。这种类型的保证也叫“没有保证”,因为一个丢弃掉所有事件的系统其实也提供了这样的保障。没有保障听起来是一个糟糕的主意,但如果我们能接受近似的结果,并且希望尽可能低的延迟,那么这样也挺好。

AT-LEAST-ONCE

  在大多数的真实应用场景,我们希望不丢失事件。这种类型的保障成为at-least-once,意思是所有的事件都得到了处理,而且一些事件还可能被处理多次。如果结果的正确性仅仅依赖于数据的完整性,那么重复处理是可以接受的。例如,判断一个事件是否在流中出现过,at-least-once这样的保证完全可以正确的实现。在最坏的情况下,我们多次遇到了这个事件。而如果我们要对一个特定的事件进行计数,计算结果就可能是错误的了。

  为了保证在at-least-once语义的保证下,计算结果也能正确。我们还需要另一套系统来从数据源或者缓存中重新播放数据。持久化的事件日志系统将会把所有的事件写入到持久化存储中。所以如果任务发生故障,这些数据可以重新播放。还有一种方法可以获得同等的效果,就是使用结果承认机制。这种方法将会把每一条数据都保存在缓存中,直到数据的处理等到所有的任务的承认。一旦得到所有任务的承认,数据将被丢弃。

EXACTLY-ONCE

  恰好处理一次是最严格的保证,也是最难实现的。恰好处理一次语义不仅仅意味着没有事件丢失,还意味着针对每一个数据,内部状态仅仅更新一次。本质上,恰好处理一次语义意味着我们的应用程序可以提供准确的结果,就好像从未发生过故障。

  提供恰好处理一次语义的保证必须有至少处理一次语义的保证才行,同时还需要数据重放机制。另外,流处理器还需要保证内部状态的一致性。也就是说,在故障恢复以后,流处理器应该知道一个事件有没有在状态中更新。事务更新是达到这个目标的一种方法,但可能引入很大的性能问题。Flink使用了一种轻量级快照机制来保证恰好处理一次语义。

端到端恰好处理一次

  目前我们看到的一致性保证都是由流处理器实现的,也就是说都是在Flink流处理器内部保证的。而在真实世界中,流处理应用除了流处理器以外还包含了数据源(例如Kafka)和持久化系统。端到端的一致性保证意味着结果的正确性贯穿了整个流处理应用的始终。每一个组件都保证了它自己的一致性。而整个端到端的一致性级别取决于所有组件中一致性最弱的组件。要注意的是,我们可以通过弱一致性来实现更强的一致性语义。例如,当任务的操作具有幂等性时,比如流的最大值或者最小值的计算。在这种场景下,我们可以通过最少处理一次这样的一致性来实现恰好处理一次这样的最高级别的一致性。

Flink基础:Flink运行架构

系统架构

  Flink是一个用于有状态的并行数据流处理的分布式系统。它由多个进程构成,这些进程一般会分布运行在不同的机器上。对于分布式系统来说,面对的常见问题有:集群中资源的分配和管理、进程协调调度、持久化和高可用的数据存储,以及故障恢复。

  对于这些分布式系统的经典问题,业内已有比较成熟的解决方案和服务。所以Flink并不会自己去处理所有的问题,而是利用了现有的集群架构和服务,这样它就可以把精力集中在核心工作——分布式数据流处理上了。Flink与一些集群资源管理工具有很好的集成,比如Apache Mesos、YARN和Kubernetes;同时,也可以配置为独立(stand-alone)集群运行。Flink自己并不提供持久化的分布式存储,而是直接利用了已有的分布式文件系统(比如HDFS)或者对象存储(比如S3)。对于高可用的配置,Flink需要依靠Apache ZooKeeper来完成。

  在本节中,我们将介绍Flink的不同组件,以及在运行程序时它们如何相互作用。我们会讨论部署Flink应用程序的两种模式,并且了解每种模式下分发和执行任务的方式。最后,我们还会解释一下Flink的高可用性模式是如何工作的。

Flink运行时组件

  Flink运行时架构主要包括四个不同的组件,它们会在运行流处理应用程序时协同工作:作业管理器(JobManager)、资源管理器(ResourceManager)、任务管理器(TaskManager),以及分发器(Dispatcher)。因为Flink是用Java和Scala实现的,所以所有组件都会运行在Java虚拟机(JVMs)上。每个组件的职责如下:

  • 作业管理器(JobManager)是控制一个应用程序执行的主进程,也就是说,每个应用程序都会被一个不同的作业管理器所控制执行。作业管理器会先接收到要执行的应用程序。这个应用程序会包括:作业图(JobGraph)、逻辑数据流图(logical dataflow graph)和打包了所有的类、库和其它资源的JAR包。作业管理器会把JobGraph转换成一个物理层面的数据流图,这个图被叫做“执行图”(ExecutionGraph),包含了所有可以并发执行的任务。作业管理器会向资源管理器(ResourceManager)请求执行任务必要的资源,也就是任务管理器(TaskManager)上的插槽(slot)。一旦它获取到了足够的资源,就会将执行图分发到真正运行它们的TaskManager上。而在运行过程中,作业管理器会负责所有需要中央协调的操作,比如说检查点(checkpoints)的协调。
  • ResourceManager主要负责管理任务管理器(TaskManager)的插槽(slot),TaskManger插槽是Flink中定义的处理资源单元。Flink为不同的环境和资源管理工具提供了不同资源管理器(ResourceManager),比如YARN、Mesos、K8s,以及standalone部署。当作业管理器申请插槽资源时,ResourceManager会将有空闲插槽的TaskManager分配给作业管理器。如果ResourceManager没有足够的插槽来满足作业管理器的请求,它还可以向资源提供平台发起会话,以提供启动TaskManager进程的容器。另外,ResourceManager还负责终止空闲的TaskManager,释放计算资源。
  • 任务管理器(TaskManager)是Flink中的工作进程。通常在Flink中会有多个TaskManager运行,每一个TaskManager都包含了一定数量的插槽(slots)。插槽的数量限制了TaskManager能够执行的任务数量。启动之后,TaskManager会向资源管理器注册它的插槽;收到资源管理器的指令后,TaskManager就会将一个或者多个插槽提供给作业管理器调用。作业管理器就可以向插槽分配任务(tasks)来执行了。在执行过程中,一个TaskManager可以跟其它运行同一应用程序的TaskManager交换数据。任务的执行和插槽的概念会在“任务执行”一节做具体讨论。
  • 分发器(Dispatcher)可以跨作业运行,它为应用提交提供了REST接口。当一个应用被提交执行时,分发器就会启动并将应用移交给一个作业管理器。由于是REST接口,所以Dispatcher可以作为集群的一个HTTP接入点,这样就能够不受防火墙阻挡。Dispatcher也会启动一个Web UI,用来方便地展示和监控作业执行的信息。Dispatcher在架构中可能并不是必需的,这取决于应用提交运行的方式。

undefined

上图是从一个较为高层级的视角,来看应用中各组件的交互协作。如果部署的集群环境不同(例如YARN,Mesos,Kubernetes,standalone等),其中一些步骤可以被省略,或是有些组件会运行在同一个JVM进程中。

应用部署

Flink应用程序可以用以下两种不同的方式部署:

框架(Framework)方式

  在这个模式下,Flink应用被打包成一个Jar文件,并由客户端提交给一个运行服务(running service)。这个服务可以是一个Flink的Dispatcher,也可以是一个Flink的作业管理器,或是Yarn的ResourceManager。如果application被提交给一个作业管理器,则它会立即开始执行这个application。如果application被提交给了一个Dispatcher,或是Yarn ResourceManager,则它会启动一个作业管理器,然后将application交给它,再由作业管理器开始执行此应用。

**库(Library)方式 **

在这个模式下,Flink Application 会被打包在一个容器(container) 镜像里,例如一个Docker 镜像。此镜像包含了运行作业管理器和ResourceManager的代码。当一个容器从镜像启动后,它会自动启动ResourceManager和作业管理器,并提交打包好的应用。另一种方法是:将应用打包到镜像后,只用于部署TaskManager容器。从镜像启动的容器会自动启动一个TaskManager,然后连接ResourceManager并注册它的slots。这些镜像的启动以及失败重启,通常都会由一个外部的资源管理器管理(比如Kubernetes)。

框架模式遵循了传统的任务提交方式,从客户端提交到Flink运行服务。而在库模式下,没有运行的Flink服务。它是将Flink作为一个库,与应用程序一同打包到了一个容器镜像。这种部署方式在微服务架构中较为常见。我们会在“运行管理流式应用程序”一节对这个话题做详细讨论。

任务执行

一个TaskManager可以同时执行多个任务(tasks)。这些任务可以是同一个算子(operator)子任务(数据并行),也可以是来自不同算子的(任务并行),甚至可以是另一个不同应用程序的(作业并行)。TaskManager提供了一定数量的处理插槽(processing slots),用于控制可以并行执行的任务数。一个slot可以执行应用的一个分片,也就是应用中每一个算子的一个并行任务。下图展示了TaskManagers,slots,tasks以及operators之间的关系:

undefined

最左边是一个“作业图”(JobGraph),包含了5个算子——它是应用程序的非并行表示。其中算子A和C是数据源(source),E是输出端(sink)。C和E并行度为2,而其他的算子并行度为4。因为最高的并行度是4,所以应用需要至少四个slot来执行任务。现在有两个TaskManager,每个又各有两个slot,所以我们的需求是满足的。作业管理器将JobGraph转化为“执行图”(ExecutionGraph),并将任务分配到四个可用的slot上。对于有4个并行任务的算子,它的task会分配到每个slot上。而对于并行度为2的operator C和E,它们的任务被分配到slot 1.1、2.1 以及 slot 1.2、2.2。将tasks调度到slots上,可以让多个tasks跑在同一个TaskManager内,也就可以是的tasks之间的数据交换更高效。然而将太多任务调度到同一个TaskManager上会导致TaskManager过载,继而影响效率。之后我们会在“控制任务调度”一节继续讨论如何控制任务的调度。

TaskManager在同一个JVM中以多线程的方式执行任务。线程较进程会更轻量级,但是线程之间并没有对任务进行严格隔离。所以,单个任务的异常行为有可能会导致整个TaskManager进程挂掉,当然也同时包括运行在此进程上的所有任务。通过为每个TaskManager配置单独的slot,就可以将应用在TaskManager上相互隔离开来。TaskManager内部有多线程并行的机制,而且在一台主机上可以部署多个TaskManager,所以Flink在资源配置上非常灵活,在部署应用时可以充分权衡性能和资源的隔离。我们将会在第九章对Flink集群的配置和搭建继续做详细讨论。

高可用配置

流式应用程序一般被设计为7 x 24小时运行。所以很重要的一点是:即使出现了进程挂掉的情况,应用仍需要继续保持运行。为了从故障恢复,系统首先需要重启进程、然后重启应用并恢复它的状态。接下来,我们就来了解Flink如何重启失败的进程。

TaskManager故障

  如前所述,Flink需要足够数目的slot,来执行一个应用的所有任务。假设一个Flink环境有4个TaskManager,每个提供2个插槽,那么流应用程序执行的最高并行度为8。如果其中一个TaskManager挂掉了,那么可用的slots会降到6。在这种情况下,作业管理器会请求ResourceManager提供更多的slots。如果此请求无法满足——例如应用跑在一个独立集群——那么作业管理器在有足够的slots之前,无法重启应用。应用的重启策略决定了作业管理器的重启频率,以及两次重启尝试之间的时间间隔。

作业管理器故障

  比TaskManager故障更严重的问题是作业管理器故障。作业管理器控制整个流应用程序的执行,并维护执行中的元数据——例如指向已完成检查点的指针。若是对应的作业管理器挂掉,则流程序无法继续运行。所以这就导致在Flink应用中,作业管理器是单点故障。为了解决这个问题,Flink提供了高可用模式。在原先的作业管理器挂掉后,可以将一个作业的状态和元数据迁移到另一个作业管理器,并继续执行。

  Flink的高可用模式基于Apache ZooKeeper,我们知道,ZooKeeper是用来管理需要协调和共识的分布式服务的系统。Flink主要利用ZooKeeper来进行领导者(leader)的选举,并把它作为一个高可用和持久化的数据存储。当在高可用模式下运行时,作业管理器会将JobGraph以及所有需要的元数据(例如应用程序的jar文件),写入到一个远程的持久化存储系统中。而且,作业管理器会将指向存储位置的指针,写入到ZooKeeper的数据存储中。在执行一个应用的过程中,作业管理器会接收每个独立任务检查点的状态句柄(也就是存储位置)。当一个检查点完成时(所有任务已经成功地将它们的状态写入到远程存储), 作业管理器把状态句柄写入远程存储,并将指向这个远程存储的指针写入ZooKeeper。这样,一个作业管理器挂掉之后再恢复,所需要的所有数据信息已经都保存在了远程存储,而ZooKeeper里存有指向此存储位置的指针。下图描述了这个设计:

undefined

当一个作业管理器失败,所有属于这个应用的任务都会自动取消。一个新的作业管理器接管工作,会执行以下操作:

  • 从ZooKeeper请求存储位置(storage location),从远端存储获取JobGraph,Jar文件,以及应用最近一次检查点(checkpoint)的状态句柄(state handles)
  • 从ResourceManager请求slots,用来继续运行应用
  • 重启应用,并将所有任务的状态,重设为最近一次已完成的检查点

  如果我们是在容器环境里运行应用(如Kubernetes),故障的作业管理器或TaskManager 容器通常会由容器服务自动重启。当运行在YARN或Mesos之上时,作业管理器或TaskManager进程会由Flink的保留进程自动触发重启。而在standalone模式下,Flink并未提供重启故障进程的工具。所以,此模式下我们可以增加备用(standby)的 作业管理器和TaskManager,用于接管故障的进程。我们将会在“高可用配置”一节中做进一步讨论。

Flink中的数据传输

基于信任度的流控制

通过网络连接来发送每条数据的效率很低,会导致很大的开销。为了充分利用网络连接的带宽,就需要进行缓冲了。在流处理的上下文中,缓冲的一个缺点是会增加延迟,因为数据需要在缓冲区中进行收集,而不是立即发送。

  Flink实现了一个基于信任度的流量控制机制,其工作原理如下。接收任务授予发送任务一些“信任度”(credit),也就是为了接收其数据而保留的网络缓冲区数。当发送者收到一个信任度通知,它就会按照被授予的信任度,发送尽可能多的缓冲数据,并且同时发送目前积压数据的大小——也就是已填满并准备发送的网络缓冲的数量。接收者用保留的缓冲区处理发来的数据,并对发送者传来的积压量进行综合考量,为其所有连接的发送者确定下一个信用度授权的优先级。

  基于信用度的流控制可以减少延迟,因为发送者可以在接收者有足够的资源接受数据时立即发送数据。此外,在数据倾斜的情况下,这样分配网络资源是一种很有效的机制,因为信用度是根据发送者积压数据量的规模授予的。因此,基于信用的流量控制是Flink实现高吞吐量和低延迟的重要组成部分。

任务链

Flink采用了一种称为任务链的优化技术,可以在特定条件下减少本地通信的开销。为了满足任务链的要求,必须将两个或多个算子设为相同的并行度,并通过本地转发(local forward)的方式进行连接。下图所示的算子管道满足这些要求。它由三个算子组成,这些算子的任务并行度都被设为2,并且通过本地转发方式相连接。

undefined

下图展示了管道以任务链方式运行的过程。算子的函数被融合成了一个单一的任务,由一个线程执行。由函数生成的数据通过一个简单的方法调用移交给下一个函数;这样在函数之间直接传递数据,基本上没有序列化和通信成本。

undefined

任务链可以显著降低本地任务之间的通信成本,但也有一些场景,在没有链接的情况下运行管道操作是有意义的。例如,如果任务链中某个函数执行的开销巨大,那就可以将一条长的任务链管道断开,或者将一条链断开为两个任务,从而可以将这个开销大的函数调度到不同的槽(slots)中。下图显示了在没有任务链的情况下相同管道操作的执行情况。所有函数都由独立的单个任务来评估,每个任务都在专有的线程中运行。

undefined

任务链在Flink中默认会启用。在“控制任务链”一节中,我们展示了如何禁用应用程序的任务链,以及如何控制各个算子的链接行为。

事件时间处理

​ 在“时间语义”一节,我们重点强调了时间语义在流处理应用中的重要性,并且解释了处理时间(processing time)和事件时间(event time)的不同。处理时间比较好理解,因为它是基于处理器本地时间的;但同时,它会带来比较混乱、不一致、并且不可重现的结果。相比之下,事件时间语义能够产生可重现且一致的结果,这也是许多流处理场景希望解决的一大难题。但是,与处理时间应用程序相比,事件时间应用程序会更复杂,需要额外的配置。另外,支持事件时间的流处理器,也比纯粹在处理时间中运行的系统内部更为复杂。

  Flink为常见的事件时间处理操作提供了直观且易于使用的原语,同时暴露了表达性很强的API,用户可以使用自定义算子实现更高级的事件时间应用程序。很好地理解Flink的内部时间处理,对于实现这样的高级应用程序会有很大帮助,有时也是必需的。上一章介绍了Flink利用两个概念来支持事件时间语义:记录时间戳(timestamps)和水位线(watermarks)。接下来,我们将描述Flink如何在内部实现并处理时间戳和水位线,进而支持具有事件时间语义的流式应用程序。

时间戳

  由Flink事件时间流应用程序处理的所有记录都必须伴有时间戳。时间戳将数据与特定时间点相关联,通常就是数据所表示的事件发生的时间点。而只要时间戳大致跟数据流保持一致,基本上随着数据流的前进而增大,应用程序就可以自由选择时间戳的含义。不过正如“时间语义”一节中所讨论的,在现实场景中,时间戳基本上都是乱序的,所以采用“事件时间”而非“处理事件”往往会显得更为重要。

  当Flink以事件时间模式处理数据流时,它会根据数据记录的时间戳来处理基于时间的算子。例如,时间窗口算子根据相关时间戳将数据分配给不同的时间窗口。Flink将时间戳编码为16字节的长整型值,并将其作为元数据附加到数据记录中。它的内置运算符会将这个长整型值解释为一个具有毫秒精度的Unix时间戳,也就是1970-01-01-00:00:00.000以来的毫秒数。当然,如果用户进行了自定义,那么运算符可以有自己的解释,例如,可以将精度调整到微秒。

水位线

  除了时间戳,基于事件时间的Flink应用程序还必须支持水位线(watermark)。在基于事件时间的应用中,水位线用于生成每个任务的当前事件时间。基于时间的算子使用这个“当前事件时间”来触发计算和处理操作。例如,一个时间窗口任务(time-window task)会在任务的事件时间超出窗口的关闭边界时,完成窗口计算,并输出计算结果。

在Flink中,水位线被实现为一条特殊的数据记录,它里面以长整型值保存了一个时间戳。水位线在带有时间戳的数据流中,跟随着其它数据一起流动,如下图所示。

undefined

水位线有两个基本属性

  • 必须单调递增,以确保任务的事件时间时钟在向前推进,而不是在后退。
  • 它们与数据的时间戳相关。带有时间戳T的水位线表示,所有后续数据的时间戳都应该大于T。

  上面的第二个属性用于处理带有乱序时间戳的数据流,比如图中时间戳3和5的数据。基于时间的算子任务会收集和处理数据(这些数据可能具有乱序的时间戳),并在事件时间时钟到达某个时刻时完成计算。这个时刻就表示数据收集的截止,具有之前时间戳的数据应该都已经到达、不再需要了;而其中的事件时间时钟,正是由当前接收到的水位线来指示的。如果任务再接收到的数据违反了watermark的这一属性,也就是时间戳小于以前接收到的水位线时,它所属的那部分计算可能已经完成了。这种数据被称为延迟数据(late records)。Flink提供了处理延迟数据的不同方式,我们会在“处理延迟数据”一节中讨论。

  水位线还有一个很有趣的特性,它允许应用程序自己来平衡结果的完整性和延迟。如果水位线与数据的时间戳非常接近,那么我们可以得到较低的处理延迟,因为任务在完成计算之前只会短暂地等待更多数据到达。而同时,结果的完整性可能会受到影响,因为相关数据可能因为迟到而被视为“延迟数据”,这样就不会包含在结果中。相反,非常保守的水位线提供了足够的时间去等待所有数据到达,这样会增加处理延迟,但提高了结果的完整性。

watermark的传递和事件时间

在本节中,我们将讨论算子如何处理水位线。Flink把watermark作为一条特殊的数据来实现,它也会由算子任务接收和发送。任务会有一个内部的时间服务,它会维护定时器,并在收到watermark时触发。任务可以在计时器服务中注册定时器,以便在将来特定的时间点执行计算。例如,窗口算子为每个活动窗口注册一个定时器,当事件时间超过窗口的结束时间时,该计时器将清除窗口的状态。

当任务收到watermark时,将执行以下操作:

  • 任务根据watermark的时间戳更新其内部事件时钟。
  • 任务的时间服务会将所有过期的计时器标识出来,它们的时间小于当前的事件时间。对于每个过期的计时器,任务调用一个回调函数,该函数可以执行计算并发送结果。
  • 任务会发出一个带有更新后的事件时间的watermark。

Flink限制通过DataStream API访问时间戳和watermark。函数不能读取或修改数据的时间戳和watermark,但底层的“处理函数”(process functions)除外,它们可以读取当前处理数据的时间戳、请求算子的当前事件时间,还可以注册定时器。通常的函数都不会暴露这些可以设置时间戳、操作任务事件时间时钟、或者发出水位线的API。而基于时间的数据流算子任务则会配置发送出的数据的时间戳,以确保它们能够与已到达的水位线平齐。例如,窗口计算完成后,时间窗口的算子任务会将窗口的结束时间作为时间戳附加到将要发送出的结果数据上,然后再使用触发窗口计算的时间戳发出watermark。

  现在,让我们更详细地解释一下任务在接收到新的watermark时,如何继续发送watermark并更新其事件时钟。正如我们在“数据并发和任务并发”中所了解的,Flink将数据流拆分为多个分区,并通过单独的算子任务并行地处理每个分区。每个分区都是一个流,里面包含了带着时间戳的数据和watermark。一个算子与它前置或后续算子的连接方式有多种情况,所以它对应的任务可以从一个或多个“输入分区”接收数据和watermark,同时也可以将数据和watermark发送到一个或多个“输出分区”。接下来,我们将详细描述一个任务如何向多个输出任务发送watermark,以及如何通过接收到的watermark来驱动事件时间时钟前进。

  任务为每个输入分区维护一个分区水位线(watermark)。当从一个分区接收到watermark时,它会比较新接收到的值和当前水位值,然后将相应的分区watermark更新为两者的最大值。然后,任务会比较所有分区watermark的大小,将其事件时钟更新为所有分区watermark的最小值。如果事件时间时钟前进了,任务就将处理所有被触发的定时器操作,并向所有连接的输出分区发送出相应的watermark,最终将新的事件时间广播给所有下游任务。

下图显示了具有四个输入分区和三个输出分区的任务如何接收watermark、更新分区watermark和事件时间时钟,以及向下游发出watermark。

undefined

具有两个或多个输入流(如Union或CoFlatMap)的算子任务(参见“多流转换”一节)也会以所有分区watermark的最小值作为事件时间时钟。它们并不区分不同输入流的分区watermark,所以两个输入流的数据都是基于相同的事件时间时钟进行处理的。当然我们可以想到,如果应用程序的各个输入流的事件时间不一致,那么这种处理方式可能会导致问题。

  Flink的水位处理和传递算法,确保了算子任务发出的时间戳和watermark是“对齐”的。不过它依赖一个条件,那就是所有分区都会提供不断增长的watermark。一旦一个分区不再推进水位线的上升,或者完全处于空闲状态、不再发送任何数据和watermark,任务的事件时间时钟就将停滞不前,任务的定时器也就无法触发了。对于基于时间的算子来说,它们需要依赖时钟的推进来执行计算和清除状态,这种情况显然就会有问题。如果任务没有定期从所有输入任务接收到新的watermark,那么基于时间的算子的处理延迟和状态空间的大小都会显著增加。

  对于具有两个输入流而且watermark明显不同的算子,也会出现类似的情况。具有两个输入流的任务的事件时间时钟,将会同较慢的那条流的watermark保持一致,而通常较快流的数据或者中间结果会在state中缓冲,直到事件时间时钟达到这条流的watermark,才会允许处理它们。

时间戳的分配和水位线的产生

我们已经解释了什么是时间戳和水位线,以及它们是如何由Flink内部处理的;然而我们还没有讨论它们的产生。流应用程序接收到数据流时,通常就会先分配时间戳并生成水位线(watermark)。因为时间戳的选择是由不同的应用程序决定的,而且watermark取决于时间戳和流的特性,所以应用程序必须首先显式地分配时间戳并生成watermark。Flink流应用程序可以通过三种方式分配时间戳和生成watermark:

  • 在数据源(source)处分配:当数据流被摄入到应用程序中时,可以由“源函数”SourceFunction分配和生成时间戳和watermark。SourceFunction可以产生并发送一个数据流;数据会与相关的时间戳一起发送出去,而watermark可以作为一条特殊数据在任何时间点发出。如果SourceFunction(暂时)不再发出watermark,它可以声明自己处于“空闲”(idle)状态。Flink会在后续算子的水位计算中,把空闲的SourceFunction产生的流分区排除掉。source的这一空闲机制,可以用来解决前面提到的水位不再上升的问题。源函数(Source Function)在“实现自定义源函数”一节中进行了更详细的讨论。
  • 定期分配:在Flink中,DataStream API提供一个名为AssignerWithPeriodicWatermarks的用户定义函数,它可以从每个数据中提取时间戳,并被定期调用以生成当前watermark。提取出的时间戳被分配给相应的数据,而生成的watermark也会添加到流中。这个函数将在“分配时间戳和生成水位线”一节中讨论。
  • 间断分配:AssignerWithPunctuatedWatermarks是另一个用户定义的函数,它同样会从每个数据中提取一个时间戳。它可以用于生成特殊输入数据中的watermark。与AssignerWithPeriodicWatermarks相比,此函数可以(但不是必须)从每个记录中提取watermark。我们在“分配时间戳和生成水位线”一节中同样讨论了该函数。

  用户定义的时间戳分配函数并没有严格的限制,通常会放在尽可能靠近source算子的位置,因为当经过一些算子处理后,数据及其时间戳的顺序就更加难以解释了。所以尽管我们可以在流应用程序的中段覆盖已有的时间戳和watermark——Flink通过用户定义的函数提供了这种灵活性,但这显然并不是推荐的做法。

状态管理

​ 我们已经知道大多数流应用程序都是有状态的。许多算子会不断地读取和更新状态,例如在窗口中收集的数据、读取输入源的位置,或者像机器学习模型那样的用户定制化的算子状态。 Flink用同样的方式处理所有的状态,无论是内置的还是用户自定义的算子。本节我们将会讨论Flink支持的不同类型的状态,并解释“状态后端”是如何存储和维护状态的。

​ 一般来说,由一个任务维护,并且用来计算某个结果的所有数据,都属于这个任务的状态。你可以认为状态就是一个本地变量,可以被任务的业务逻辑访问。下图显示了任务与其状态之间的交互。

undefined

  任务会接收一些输入数据。在处理数据时,任务可以读取和更新状态,并根据输入数据和状态计算结果。最简单的例子,就是统计接收到多少条数据的任务。当任务收到新数据时,它会访问状态以获取当前的计数,然后让计数递增,更新状态并发送出新的计数。

  应用程序里,读取和写入状态的逻辑一般都很简单直接,而有效可靠的状态管理会复杂一些。这包括如何处理很大的状态——可能会超过内存,并且保证在发生故障时不会丢失任何状态。幸运的是,Flink会帮我们处理这相关的所有问题,包括状态一致性、故障处理以及高效存储和访问,以便开发人员可以专注于应用程序的逻辑。

  在Flink中,状态始终与特定算子相关联。为了使运行时的Flink了解算子的状态,算子需要预先注册其状态。总的说来,有两种类型的状态:算子状态(operator state)和键控状态(keyed state),它们有着不同的范围访问,我们将在下面展开讨论。

算子状态

算子状态的作用范围限定为算子任务。这意味着由同一并行任务所处理的所有数据都可以访问到相同的状态,状态对于同一任务而言是共享的。算子状态不能由相同或不同算子的另一个任务访问。下图显示了任务如何访问算子状态。

undefined

Flink为算子状态提供三种基本数据结构:

列表状态

将状态表示为一组数据的列表。

联合列表状态

也将状态表示为数据的列表。它与常规列表状态的区别在于,在发生故障时,或者从保存点(savepoint)启动应用程序时如何恢复。我们将在后面继续讨论。

广播状态

如果一个算子有多项任务,而它的每项任务状态又都相同,那么这种特殊情况最适合应用广播状态。在保存检查点和重新调整算子并行度时,会用到这个特性。这两部分内容将在本章后面讨论。

键控状态

顾名思义,键控状态是根据输入数据流中定义的键(key)来维护和访问的。Flink为每个键值维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中,这个任务会维护和处理这个key对应的状态。当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的key。因此,具有相同key的所有数据都会访问相同的状态。下图显示了任务如何与键控状态进行交互。

undefined

我们可以将键控状态看成是在算子所有并行任务上,对键进行分区(或分片)之后的一个键值映射(key-value map)。 Flink为键控状态提供不同的数据结构,用于确定map中每个key存储的值的类型。我们简单了解一下最常见的键控状态。

值状态

为每个键存储一个任意类型的单个值。复杂数据结构也可以存储为值状态。

列表状态

为每个键存储一个值的列表。列表里的每个数据可以是任意类型。

映射状态

为每个键存储一个键值映射(map)。map的key和value可以是任意类型。

状态的数据结构可以让Flink实现更有效的状态访问。我们将在“在运行时上下文(RuntimeContext)中声明键控状态”中做进一步讨论。

状态后端

  每传入一条数据,有状态的算子任务都会读取和更新状态。由于有效的状态访问对于处理数据的低延迟至关重要,因此每个并行任务都会在本地维护其状态,以确保快速的状态访问。状态到底是如何被存储、访问以及维护的?这件事由一个可插入的组件决定,这个组件就叫做状态后端(state backend)。状态后端主要负责两件事:本地的状态管理,以及将检查点(checkpoint)状态写入远程存储。

  对于本地状态管理,状态后端会存储所有键控状态,并确保所有的访问都被正确地限定在当前键范围。 Flink提供了默认的状态后端,会将键控状态作为内存中的对象进行管理,将它们存储在JVM堆上。另一种状态后端则会把状态对象进行序列化,并将它们放入RocksDB中,然后写入本地硬盘。第一种方式可以提供非常快速的状态访问,但它受内存大小的限制;而访问RocksDB状态后端存储的状态速度会较慢,但其状态可以增长到非常大。

  状态检查点的写入也非常重要,这是因为Flink是一个分布式系统,而状态只能在本地维护。 TaskManager进程(所有任务在其上运行)可能在任何时间点挂掉。因此,它的本地存储只能被认为是不稳定的。状态后端负责将任务的状态检查点写入远程的持久存储。写入检查点的远程存储可以是分布式文件系统,也可以是数据库。不同的状态后端在状态检查点的写入机制方面有所不同。例如,RocksDB状态后端支持增量的检查点,这对于非常大的状态来说,可以显著减少状态检查点写入的开销。

我们将在“选择状态后端”一节中更详细地讨论不同的状态后端及其优缺点。

调整有状态算子的并行度

  流应用程序的一个常见要求是,为了增大或较小输入数据的速率,需要灵活地调整算子的并行度。对于无状态算子而言,并行度的调整没有任何问题,但更改有状态算子的并行度显然就没那么简单了,因为它们的状态需要重新分区并分配给更多或更少的并行任务。 Flink支持四种模式来调整不同类型的状态。

  具有键控状态的算子通过将键重新分区为更少或更多任务来缩放并行度。不过,并行度调整时任务之间会有一些必要的状态转移。为了提高效率,Flink并不会对单独的key做重新分配,而是用所谓的“键组”(key group)把键管理起来。键组是key的分区形式,同时也是Flink为任务分配key的方式。图3-13显示了如何在键组中重新分配键控状态。

undefined

  具有算子列表状态的算子,会通过重新分配列表中的数据项目来进行并行度缩放。从概念上讲,所有并行算子任务的列表项目会被收集起来,并将其均匀地重新分配给更少或更多的任务。如果列表条目少于算子的新并行度,则某些任务将以空状态开始。下图显示了算子列表状态的重新分配。

undefined

  具有算子联合列表状态的算子,会通过向每个任务广播状态的完整列表,来进行并行度的缩放。然后,任务可以选择要使用的状态项和要丢弃的状态项。下图显示了如何重新分配算子联合列表状态。

undefined

  具有算子广播状态的算子,通过将状态复制到新任务,来增大任务的并行度。这是没问题的,因为广播状态保证了所有任务都具有相同的状态。而对于缩小并行度的情况,我们可以直接取消剩余任务,因为状态是相同的,已经被复制并且不会丢失。下图显示了算子广播状态的重新分配。

undefined

检查点,保存点和状态恢复

  Flink是一个分布式数据处理系统,因此必须有一套机制处理各种故障,比如被杀掉的进程,故障的机器和中断的网络连接。任务都是在本地维护状态的,所以Flink必须确保状态不会丢失,并且在发生故障时能够保持一致。

  在本节中,我们将介绍Flink的检查点(checkpoint)和恢复机制,这保证了“精确一次”(exactly-once)的状态一致性。我们还会讨论Flink独特的保存点(savepoint)功能,这是一个“瑞士军刀”式的工具,可以解决许多操作数据流时面对的问题。

一致的检查点

  Flink的恢复机制的核心,就是应用状态的一致检查点。有状态流应用的一致检查点,其实就是所有任务状态在某个时间点的一份拷贝,而这个时间点应该是所有任务都恰好处理完一个相同的输入数据的时候。这个过程可以通过一致检查点的一个简单算法步骤来解释。这个算法的步骤是:

  • 暂停所有输入流的摄取,也就是不再接收新数据的输入。
  • 等待所有正在处理的数据计算完毕,这意味着结束时,所有任务都已经处理了所有输入数据。
  • 通过将每个任务的状态复制到远程持久存储,来得到一个检查点。所有任务完成拷贝操作后,检查点就完成了。
  • 恢复所有输入流的摄取。

  需要注意,Flink实现的并不是这种简单的机制。我们将在本节后面介绍Flink更精妙的检查点算法。

下图显示了一个简单应用中的一致检查点。

undefined

  上面的应用程序中具有单一的输入源(source)任务,输入数据就是一组不断增长的数字的流——1,2,3等。数字流被划分为偶数流和奇数流。求和算子(sum)的两个任务会分别实时计算当前所有偶数和奇数的总和。源任务会将其输入流的当前偏移量存储为状态,而求和任务则将当前的总和值存储为状态。在图3-17中,Flink在输入偏移量为5时,将检查点写入了远程存储,当前的总和为6和9

从一致检查点中恢复状态

在执行流应用程序期间,Flink会定期检查状态的一致检查点。如果发生故障,Flink将会使用最近的检查点来一致恢复应用程序的状态,并重新启动处理流程。下图显示了恢复过程。

undefined

应用程序从检查点的恢复分为三步:

  • 重新启动整个应用程序。
  • 将所有的有状态任务的状态重置为最近一次的检查点。
  • 恢复所有任务的处理。

  这种检查点的保存和恢复机制可以为应用程序状态提供“精确一次”(exactly-once)的一致性,因为所有算子都会保存检查点并恢复其所有状态,这样一来所有的输入流就都会被重置到检查点完成时的位置。至于数据源是否可以重置它的输入流,这取决于其实现方式和消费流数据的外部接口。例如,像Apache Kafka这样的事件日志系统可以提供流上之前偏移位置的数据,所以我们可以将源重置到之前的偏移量,重新消费数据。而从套接字(socket)消费数据的流就不能被重置了,因为套接字的数据一旦被消费就会丢弃掉。因此,对于应用程序而言,只有当所有的输入流消费的都是可重置的数据源时,才能确保在“精确一次”的状态一致性下运行。

  从检查点重新启动应用程序后,其内部状态与检查点完成时的状态完全相同。然后它就会开始消费并处理检查点和发生故障之间的所有数据。尽管这意味着Flink会对一些数据处理两次(在故障之前和之后),我们仍然可以说这个机制实现了精确一次的一致性语义,因为所有算子的状态都已被重置,而重置后的状态下还不曾看到这些数据。

  我们必须指出,Flink的检查点保存和恢复机制仅仅可以重置流应用程序的内部状态。对于应用中的一些的输出(sink)算子,在恢复期间,某些结果数据可能会多次发送到下游系统,比如事件日志、文件系统或数据库。对于某些存储系统,Flink提供了具有精确一次输出功能的sink函数,比如,可以在检查点完成时提交发出的记录。另一种适用于许多存储系统的方法是幂等更新。在“应用程序一致性保证”一节中,我们还会详细讨论如何解决应用程序端到端的精确一次一致性问题。

Flink的检查点算法

  Flink的恢复机制,基于它的一致性检查点。前面我们已经了解了从流应用中创建检查点的简单方法——先暂停应用,保存检查点,然后再恢复应用程序,这种方法很好理解,但它的理念是“停止一切”,这对于即使是中等延迟要求的应用程序而言也是不实用的。所以Flink没有这么简单粗暴,而是基于Chandy-Lamport算法实现了分布式快照的检查点保存。该算法并不会暂停整个应用程序,而是将检查点的保存与数据处理分离,这样就可以实现在其它任务做检查点状态保存状态时,让某些任务继续进行而不受影响。接下来我们将解释此算法的工作原理。

  Flink的检查点算法用到了一种称为“检查点分界线”(checkpoint barrier)的特殊数据形式。与水位线(watermark)类似,检查点分界线由source算子注入到常规的数据流中,它的位置是限定好的,不能超过其他数据,也不能被后面的数据超过。检查点分界线带有检查点ID,用来标识它所属的检查点;这样,这个分界线就将一条流逻辑上分成了两部分。分界线之前到来的数据导致的状态更改,都会被包含在当前分界线所属的检查点中;而基于分界线之后的数据导致的所有更改,就会被包含在之后的检查点中。

  我们用一个简单的流应用程序作为示例,来一步一步解释这个算法。该应用程序有两个源(source)任务,每个任务都消费一个增长的数字流。源任务的输出被划分为两部分:偶数和奇数的流。每个分区由一个任务处理,该任务计算所有收到的数字的总和,并将更新的总和转发给输出(sink)任务。这个应用程序的结构如下图所示。

undefined

作业管理器会向每个数据源(source)任务发送一条带有新检查点ID的消息,通过这种方式来启动检查点,如下图所示。

undefined

  当source任务收到消息时,它会暂停发出新的数据,在状态后端触发本地状态的检查点保存,并向所有传出的流分区广播带着检查点ID的分界线(barriers)。状态后端在状态检查点完成后会通知任务,而任务会向作业管理器确认检查点完成。在发出所有分界线后,source任务就可以继续常规操作,发出新的数据了。通过将分界线注入到输出流中,源函数(source function)定义了检查点在流中所处的位置。下图显示了两个源任务将本地状态保存到检查点,并发出检查点分界线之后的流应用程序。

undefined

  源任务发出的检查点分界线(barrier),将被传递给所连接的任务。与水位线(watermark)类似,barrier会被广播到所有连接的并行任务,以确保每个任务从它的每个输入流中都能接收到。当任务收到一个新检查点的barrier时,它会等待这个检查点的所有输入分区的barrier到达。在等待的过程中,任务并不会闲着,而是会继续处理尚未提供barrier的流分区中的数据。对于那些barrier已经到达的分区,如果继续有新的数据到达,它们就不会被立即处理,而是先缓存起来。这个等待所有分界线到达的过程,称为“分界线对齐”(barrier alignment),如下图所示。

undefined

当任务从所有输入分区都收到barrier时,它就会在状态后端启动一个检查点的保存,并继续向所有下游连接的任务广播检查点分界线,如下图所示。

undefined

所有的检查点barrier都发出后,任务就开始处理之前缓冲的数据。在处理并发出所有缓冲数据之后,任务就可以继续正常处理输入流了。下图显示了此时的应用程序。

undefined

  最终,检查点分界线会到达输出(sink)任务。当sink任务接收到barrier时,它也会先执行“分界线对齐”,然后将自己的状态保存到检查点,并向作业管理器确认已接收到barrier。一旦从应用程序的所有任务收到一个检查点的确认信息,作业管理器就会将这个检查点记录为已完成。下图显示了检查点算法的最后一步。这样,当发生故障时,我们就可以用已完成的检查点恢复应用程序了。

undefined

检查点的性能影响

  Flink的检查点算法可以在不停止整个应用程序的情况下,生成一致的分布式检查点。但是,它可能会增加应用程序的处理延迟。Flink对此有一些调整措施,可以在某些场景下显得对性能的影响没那么大。

  当任务将其状态保存到检查点时,它其实处于一个阻塞状态,而此时新的输入会被缓存起来。由于状态可能变得非常大,而且检查点需要通过网络将数据写入远程存储系统,检查点的写入很容易就会花费几秒到几分钟的时间——这对于要求低延迟的应用程序而言,显然是不可接受的。在Flink的设计中,真正负责执行检查点写入的,其实是状态后端。具体怎样复制任务的状态,取决于状态后端的实现方式。例如,文件系统(FileSystem)状态后端和RocksDB状态后端都支持了异步(asynchronous)检查点。触发检查点操作时,状态后端会先创建状态的本地副本。本地拷贝完成后,任务就将继续常规的数据处理,这往往并不会花费太多时间。一个后台线程会将本地快照异步复制到远程存储,并在完成检查点后再回来通知任务。异步检查点的机制,显著减少了任务继续处理数据之前的等待时间。此外,RocksDB状态后端还实现了增量的检查点,这样可以大大减少要传输的数据量。

  为了减少检查点算法对处理延迟的影响,另一种技术是调整分界线对齐的步骤。对于需要非常低的延迟、并且可以容忍“至少一次”(at-least-once)状态保证的应用程序,Flink可以将检查点算法配置为,在等待barrier对齐期间处理所有到达的数据,而不是把barrier已经到达的那些分区的数据缓存起来。当检查点的所有barrier到达,算子任务就会将状态写入检查点——当然,现在的状态中,就可能包括了一些“提前”的更改,这些更改由本该属于下一个检查点的数据到来时触发。如果发生故障,从检查点恢复时,就将再次处理这些数据:这意味着检查点现在提供的是“至少一次”(at-least-once)而不是“精确一次”(exactly-once)的一致性保证。

保存点

  Flink的恢复算法是基于状态检查点的。Flink根据可配置的策略,定期保存并自动丢弃检查点。检查点的目的是确保在发生故障时可以重新启动应用程序,所以当应用程序被显式地撤销(cancel)时,检查点会被删除掉。除此之外,应用程序状态的一致性快照还可用于除故障恢复之外的更多功能。

Flink中一个最有价值,也是最独特的功能是保存点(savepoints)。原则上,创建保存点使用的算法与检查点完全相同,因此保存点可以认为就是具有一些额外元数据的检查点。 Flink不会自动创建保存点,因此用户(或者外部调度程序)必须明确地触发创建操作。同样,Flink也不会自动清理保存点。第10章将会具体介绍如何触发和处理保存点。

使用保存点

  有了应用程序和与之兼容的保存点,我们就可以从保存点启动应用程序了。这会将应用程序的状态初始化为保存点的状态,并从保存点创建时的状态开始运行应用程序。虽然看起来这种行为似乎与用检查点从故障中恢复应用程序完全相同,但实际上故障恢复只是一种特殊情况,它只是在相同的集群上以相同的配置启动相同的应用程序。而从保存点启动应用程序会更加灵活,这就可以让我们做更多事情了。

  • 可以从保存点启动不同但兼容的应用程序。这样一来,我们就可以及时修复应用程序中的逻辑bug,并让流式应用的源尽可能多地提供之前发生的事件,然后重新处理,以便修复之前的计算结果。修改后的应用程序还可用于运行A / B测试,或者具有不同业务逻辑的假设场景。这里要注意,应用程序和保存点必须兼容才可以这么做——也就是说,应用程序必须能够加载保存点的状态。
  • 可以使用不同的并行度来启动相同的应用程序,可以将应用程序的并行度增大或减小。
  • 可以在不同的集群上启动同样的应用程序。这非常有意义,意味着我们可以将应用程序迁移到较新的Flink版本或不同的集群上去。
  • 可以使用保存点暂停应用程序,稍后再恢复。这样做的意义在于,可以为更高优先级的应用程序释放集群资源,或者在输入数据不连续生成时释放集群资源。
  • 还可以将保存点设置为某一版本,并归档(archive)存储应用程序的状态。

保存点是非常强大的功能,所以许多用户会定期创建保存点以便能够及时退回之前的状态。我们见到的各种场景中,保存点一个最有趣的应用是不断将流应用程序迁移到更便宜的数据中心上去。

从保存点启动应用程序

  前面提到的保存点的所有用例,都遵循相同的模式。那就是首先创建正在运行的应用程序的保存点,然后在一个新启动的应用程序中用它来恢复状态。之前我们已经知道,保存点的创建和检查点非常相似,而接下来我们就将介绍对于一个从保存点启动的应用程序,Flink如何初始化其状态。

  应用程序由多个算子组成。每个算子可以定义一个或多个键控状态和算子状态。算子由一个或多个算子任务并行执行。因此,一个典型的应用程序会包含多个状态,这些状态分布在多个算子任务中,这些任务可以运行在不同的TaskManager进程上。

  图3-26显示了一个具有三个算子的应用程序,每个算子执行两个算子任务。一个算子(OP-1)具有单一的算子状态(OS-1),而另一个算子(OP-2)具有两个键控状态(KS-1和KS-2)。当保存点创建时,会将所有任务的状态复制到持久化的存储位置。

  保存点中的状态拷贝会以算子标识符(operator ID)和状态名称(state name)组织起来。算子ID和状态名称必须能够将保存点的状态数据,映射到一个正在启动的应用程序的算子状态。从保存点启动应用程序时,Flink会将保存点的数据重新分配给相应的算子任务。

请注意,保存点不包含有关算子任务的信息。这是因为当应用程序以不同的并行度启动时,任务数量可能会更改。

  如果我们要从保存点启动一个修改过的应用程序,那么保存点中的状态只能映射到符合标准的应用程序——它里面的算子必须具有相应的ID和状态名称。默认情况下,Flink会自动分配唯一的算子ID。然而,一个算子的ID,是基于它之前算子的ID确定性地生成的。因此,算子的ID会在其前序算子改变时改变,比如,当我们添加了新的或移除掉一个算子时,前序算子ID改变,当前算子ID就会变化。所以对于具有默认算子ID的应用程序而言,如果想在不丢失状态的前提下升级,就会受到极大的限制。因此,我们强烈建议在程序中为算子手动分配唯一ID,而不是依靠Flink的默认分配。我们将在“指定唯一的算子标识符”一节中详细说明如何分配算子标识符。

Flink维表关联方式

从维表加载方式可以分为:

  • 实时数据库查找关联
  • 预加载维表关联
  • 维表变更日志关联
  • 灵活组合衍生其他关联方式

从维表在Flink内部的使用方式可以分为:

  • 预加载维表
  • 热存储维表
  • 广播维表
  • Temporal Table Function Join (仅SQL可用)

一般我们习惯性把这个操作叫做Data Enrichment(数据扩充)

从处理思路又可以分为四种:

  • 全量预加载+定时刷新
  • 实时查询+缓存刷新
  • 纯实时查询
  • 流式化纬度

TODO 后面我会针对这些纬度表的处理方式进行进一步的整理和补充

整体维表关联设计的指标:

  1. 实现简单性:设计是否足够简单,易于迭代和维护
  2. 吞吐量:性能是否足够好
  3. 维表数据的实时性:维度表的更新是否可以立即对作业可见
  4. 数据库的负载
  5. 内存资源占用
  6. 可拓展性
  7. 结果确定性

Flink 原理与实现:内存管理

如今,大数据领域的开源框架(Hadoop,Spark,Storm)都使用的 JVM,当然也包括 Flink。基于 JVM 的数据分析引擎都需要面对将大量数据存到内存中,这就不得不面对 JVM 存在的几个问题:

  1. Java 对象存储密度低。一个只包含 boolean 属性的对象占用了16个字节内存:对象头占了8个,boolean 属性占了1个,对齐填充占了7个。而实际上只需要一个bit(1/8字节)就够了。
  2. Full GC 会极大地影响性能,尤其是为了处理更大数据而开了很大内存空间的JVM来说,GC 会达到秒级甚至分钟级。
  3. OOM 问题影响稳定性。OutOfMemoryError是分布式计算框架经常会遇到的问题,当JVM中所有对象大小超过分配给JVM的内存大小时,就会发生OutOfMemoryError错误,导致JVM崩溃,分布式框架的健壮性和性能都会受到影响。

所以目前,越来越多的大数据项目开始自己管理JVM内存了,像 Spark、Flink、HBase,为的就是获得像 C 一样的性能以及避免 OOM 的发生。本文将会讨论 Flink 是如何解决上面的问题的,主要内容包括内存管理、定制的序列化工具、缓存友好的数据结构和算法、堆外内存、JIT编译优化(Just In Time Compiler 即时编译器)等。

积极的内存管理

Flink 并不是将大量对象存在堆上,而是将对象都序列化到一个预分配的内存块上,这个内存块叫做 MemorySegment,它代表了一段固定长度的内存(默认大小为 32KB),也是 Flink 中最小的内存分配单元,并且提供了非常高效的读写方法。你可以把 MemorySegment 想象成是为 Flink 定制的 java.nio.ByteBuffer。它的底层可以是一个普通的 Java 字节数组(byte[]),也可以是一个申请在堆外的 ByteBuffer。每条记录都会以序列化的形式存储在一个或多个MemorySegment中。

Flink 中的 Worker 名叫 TaskManager,是用来运行用户代码的 JVM 进程。TaskManager 的堆内存主要被分成了三个部分:

undefined

  • Network Buffers: 一定数量的32KB大小的 buffer,主要用于数据的网络传输。在 TaskManager 启动的时候就会分配。默认数量是 2048 个,可以通过 taskmanager.network.numberOfBuffers 来配置。(阅读这篇文章了解更多Network Buffer的管理)
  • Memory Manager Pool: 这是一个由 MemoryManager 管理的,由众多MemorySegment组成的超大集合。Flink 中的算法(如 sort/shuffle/join)会向这个内存池申请 MemorySegment,将序列化后的数据存于其中,使用完后释放回内存池。默认情况下,池子占了堆内存的 70% 的大小。
  • Remaining (Free) Heap: 这部分的内存是留给用户代码以及 TaskManager 的数据结构使用的。因为这些数据结构一般都很小,所以基本上这些内存都是给用户代码使用的。从GC的角度来看,可以把这里看成的新生代,也就是说这里主要都是由用户代码生成的短期对象。

注意:Memory Manager Pool 主要在Batch模式下使用。在Steaming模式下,该池子不会预分配内存,也不会向该池子请求内存块。也就是说该部分的内存都是可以给用户代码使用的。不过社区是打算在 Streaming 模式下也能将该池子利用起来。

Flink 采用类似 DBMS 的 sort 和 join 算法,直接操作二进制数据,从而使序列化/反序列化带来的开销达到最小。所以 Flink 的内部实现更像 C/C++ 而非 Java。如果需要处理的数据超出了内存限制,则会将部分数据存储到硬盘上。如果要操作多块MemorySegment就像操作一块大的连续内存一样,Flink会使用逻辑视图(AbstractPagedInputView)来方便操作。下图描述了 Flink 如何存储序列化后的数据到内存块中,以及在需要的时候如何将数据存储到磁盘上。

从上面我们能够得出 Flink 积极的内存管理以及直接操作二进制数据有以下几点好处:

  1. 减少GC压力。显而易见,因为所有常驻型数据都以二进制的形式存在 Flink 的MemoryManager中,这些MemorySegment一直呆在老年代而不会被GC回收。其他的数据对象基本上是由用户代码生成的短生命周期对象,这部分对象可以被 Minor GC 快速回收。只要用户不去创建大量类似缓存的常驻型对象,那么老年代的大小是不会变的,Major GC也就永远不会发生。从而有效地降低了垃圾回收的压力。另外,这里的内存块还可以是堆外内存,这可以使得 JVM 内存更小,从而加速垃圾回收。
  2. 避免了OOM。所有的运行时数据结构和算法只能通过内存池申请内存,保证了其使用的内存大小是固定的,不会因为运行时数据结构和算法而发生OOM。在内存吃紧的情况下,算法(sort/join等)会高效地将一大批内存块写到磁盘,之后再读回来。因此,OutOfMemoryErrors可以有效地被避免。
  3. 节省内存空间。Java 对象在存储上有很多额外的消耗(如上一节所谈)。如果只存储实际数据的二进制内容,就可以避免这部分消耗。
  4. 高效的二进制操作 & 缓存友好的计算。二进制数据以定义好的格式存储,可以高效地比较与操作。另外,该二进制形式可以把相关的值,以及hash值,键值和指针等相邻地放进内存中。这使得数据结构可以对高速缓存更友好,可以从 L1/L2/L3 缓存获得性能的提升(下文会详细解释)。

目前 Java 生态圈提供了众多的序列化框架:Java serialization, Kryo, Apache Avro 等等。但是 Flink 实现了自己的序列化框架。因为在 Flink 中处理的数据流通常是同一类型,由于数据集对象的类型固定,对于数据集可以只保存一份对象Schema信息,节省大量的存储空间。同时,对于固定大小的类型,也可通过固定的偏移位置存取。当我们需要访问某个对象成员变量的时候,通过定制的序列化工具,并不需要反序列化整个Java对象,而是可以直接通过偏移量,只是反序列化特定的对象成员变量。如果对象的成员变量较多时,能够大大减少Java对象的创建开销,以及内存数据的拷贝大小。

Flink支持任意的Java或是Scala类型。Flink 在数据类型上有很大的进步,不需要实现一个特定的接口(像Hadoop中的org.apache.hadoop.io.Writable),Flink 能够自动识别数据类型。Flink 通过 Java Reflection 框架分析基于 Java 的 Flink 程序 UDF (User Define Function)的返回类型的类型信息,通过 Scala Compiler 分析基于 Scala 的 Flink 程序 UDF 的返回类型的类型信息。类型信息由 TypeInformation 类表示,TypeInformation 支持以下几种类型:

  • BasicTypeInfo: 任意Java 基本类型(装箱的)或 String 类型。
  • BasicArrayTypeInfo: 任意Java基本类型数组(装箱的)或 String 数组。
  • WritableTypeInfo: 任意 Hadoop Writable 接口的实现类。
  • TupleTypeInfo: 任意的 Flink Tuple 类型(支持Tuple1 to Tuple25)。Flink tuples 是固定长度固定类型的Java Tuple实现。
  • CaseClassTypeInfo: 任意的 Scala CaseClass(包括 Scala tuples)。
  • PojoTypeInfo: 任意的 POJO (Java or Scala),例如,Java对象的所有成员变量,要么是 public 修饰符定义,要么有 getter/setter 方法。
  • GenericTypeInfo: 任意无法匹配之前几种类型的类。

前六种数据类型基本上可以满足绝大部分的Flink程序,针对前六种类型数据集,Flink皆可以自动生成对应的TypeSerializer,能非常高效地对数据集进行序列化和反序列化。对于最后一种数据类型,Flink会使用Kryo进行序列化和反序列化。每个TypeInformation中,都包含了serializer,类型会自动通过serializer进行序列化,然后用Java Unsafe接口写入MemorySegments。对于可以用作key的数据类型,Flink还同时自动生成TypeComparator,用来辅助直接对序列化后的二进制数据进行compare、hash等操作。对于 Tuple、CaseClass、POJO 等组合类型,其TypeSerializer和TypeComparator也是组合的,序列化和比较时会委托给对应的serializers和comparators。如下图展示 一个内嵌型的Tuple3 对象的序列化过程。

undefined

可以看出这种序列化方式存储密度是相当紧凑的。其中 int 占4字节,double 占8字节,POJO多个一个字节的header,PojoSerializer只负责将header序列化进去,并委托每个字段对应的serializer对字段进行序列化。

Flink 的类型系统可以很轻松地扩展出自定义的TypeInformation、Serializer以及Comparator,来提升数据类型在序列化和比较时的性能。

Flink 提供了如 group、sort、join 等操作,这些操作都需要访问海量数据。这里,我们以sort为例,这是一个在 Flink 中使用非常频繁的操作。

首先,Flink 会从 MemoryManager 中申请一批 MemorySegment,我们把这批 MemorySegment 称作 sort buffer,用来存放排序的数据。

undefined

我们会把 sort buffer 分成两块区域。一个区域是用来存放所有对象完整的二进制数据。另一个区域用来存放指向完整二进制数据的指针以及定长的序列化后的key(key+pointer)。如果需要序列化的key是个变长类型,如String,则会取其前缀序列化。如上图所示,当一个对象要加到 sort buffer 中时,它的二进制数据会被加到第一个区域,指针(可能还有key)会被加到第二个区域。

将实际的数据和指针加定长key分开存放有两个目的。第一,交换定长块(key+pointer)更高效,不用交换真实的数据也不用移动其他key和pointer。第二,这样做是缓存友好的,因为key都是连续存储在内存中的,可以大大减少 cache miss(后面会详细解释)。

排序的关键是比大小和交换。Flink 中,会先用 key 比大小,这样就可以直接用二进制的key比较而不需要反序列化出整个对象。因为key是定长的,所以如果key相同(或者没有提供二进制key),那就必须将真实的二进制数据反序列化出来,然后再做比较。之后,只需要交换key+pointer就可以达到排序的效果,真实的数据不用移动。

undefined

最后,访问排序后的数据,可以沿着排好序的key+pointer区域顺序访问,通过pointer找到对应的真实数据,并写到内存或外部(更多细节可以看这篇文章 Joins in Flink)。

缓存友好的数据结构和算法

随着磁盘IO和网络IO越来越快,CPU逐渐成为了大数据领域的瓶颈。从 L1/L2/L3 缓存读取数据的速度比从主内存读取数据的速度快好几个量级。通过性能分析可以发现,CPU时间中的很大一部分都是浪费在等待数据从主内存过来上。如果这些数据可以从 L1/L2/L3 缓存过来,那么这些等待时间可以极大地降低,并且所有的算法会因此而受益。

在上面讨论中我们谈到的,Flink 通过定制的序列化框架将算法中需要操作的数据(如sort中的key)连续存储,而完整数据存储在其他地方。因为对于完整的数据来说,key+pointer更容易装进缓存,这大大提高了缓存命中率,从而提高了基础算法的效率。这对于上层应用是完全透明的,可以充分享受缓存友好带来的性能提升。

走向堆外内存

Flink 基于堆内存的内存管理机制已经可以解决很多JVM现存问题了,为什么还要引入堆外内存?

  1. 启动超大内存(上百GB)的JVM需要很长时间,GC停留时间也会很长(分钟级)。使用堆外内存的话,可以极大地减小堆内存(只需要分配Remaining Heap那一块),使得 TaskManager 扩展到上百GB内存不是问题。
  2. 高效的 IO 操作。堆外内存在写磁盘或网络传输时是 zero-copy,而堆内存的话,至少需要 copy 一次。
  3. 堆外内存是进程间共享的。也就是说,即使JVM进程崩溃也不会丢失数据。这可以用来做故障恢复(Flink暂时没有利用起这个,不过未来很可能会去做)。

但是强大的东西总是会有其负面的一面,不然为何大家不都用堆外内存呢。

  1. 堆内存的使用、监控、调试都要简单很多。堆外内存意味着更复杂更麻烦。
  2. Flink 有时需要分配短生命周期的 MemorySegment,这个申请在堆上会更廉价。
  3. 有些操作在堆内存上会快一点点。

Flink用通过ByteBuffer.allocateDirect(numBytes)来申请堆外内存,用 sun.misc.Unsafe 来操作堆外内存。

基于 Flink 优秀的设计,实现堆外内存是很方便的。Flink 将原来的 MemorySegment 变成了抽象类,并生成了两个子类。HeapMemorySegmentHybridMemorySegment。从字面意思上也很容易理解,前者是用来分配堆内存的,后者是用来分配堆外内存和堆内存的。是的,你没有看错,后者既可以分配堆外内存又可以分配堆内存。为什么要这样设计呢?

首先假设HybridMemorySegment只提供分配堆外内存。在上述堆外内存的不足中的第二点谈到,Flink 有时需要分配短生命周期的 buffer,这些buffer用HeapMemorySegment会更高效。那么当使用堆外内存时,为了也满足堆内存的需求,我们需要同时加载两个子类。这就涉及到了 JIT 编译优化的问题。因为以前 MemorySegment 是一个单独的 final 类,没有子类。JIT 编译时,所有要调用的方法都是确定的,所有的方法调用都可以被去虚化(de-virtualized)和内联(inlined),这可以极大地提高性能(MemroySegment的使用相当频繁)。然而如果同时加载两个子类,那么 JIT 编译器就只能在真正运行到的时候才知道是哪个子类,这样就无法提前做优化。实际测试的性能差距在 2.7 倍左右。

Flink 使用了两种方案:

方案1:只能有一种 MemorySegment 实现被加载

代码中所有的短生命周期和长生命周期的MemorySegment都实例化其中一个子类,另一个子类根本没有实例化过(使用工厂模式来控制)。那么运行一段时间后,JIT 会意识到所有调用的方法都是确定的,然后会做优化。

方案2:提供一种实现能同时处理堆内存和堆外内存

这就是 HybridMemorySegment 了,能同时处理堆与堆外内存,这样就不需要子类了。这里 Flink 优雅地实现了一份代码能同时操作堆和堆外内存。这主要归功于 sun.misc.Unsafe提供的一系列方法,如getLong方法:

1
sun.misc.Unsafe.getLong(Object reference, long offset)
  • 如果reference不为空,则会取该对象的地址,加上后面的offset,从相对地址处取出8字节并得到 long。这对应了堆内存的场景。
  • 如果reference为空,则offset就是要操作的绝对地址,从该地址处取出数据。这对应了堆外内存的场景。

这里我们看下 MemorySegment 及其子类的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
public abstract class MemorySegment {
// 堆内存引用
protected final byte[] heapMemory;
// 堆外内存地址
protected long address;

//堆内存的初始化
MemorySegment(byte[] buffer, Object owner) {
//一些先验检查
...
this.heapMemory = buffer;
this.address = BYTE_ARRAY_BASE_OFFSET;
...
}

//堆外内存的初始化
MemorySegment(long offHeapAddress, int size, Object owner) {
//一些先验检查
...
this.heapMemory = null;
this.address = offHeapAddress;
...
}

public final long getLong(int index) {
final long pos = address + index;
if (index >= 0 && pos <= addressLimit - 8) {
// 这是我们关注的地方,使用 Unsafe 来操作 on-heap & off-heap
return UNSAFE.getLong(heapMemory, pos);
}
else if (address > addressLimit) {
throw new IllegalStateException("segment has been freed");
}
else {
// index is in fact invalid
throw new IndexOutOfBoundsException();
}
}
...
}

public final class HeapMemorySegment extends MemorySegment {
// 指向heapMemory的额外引用,用来如数组越界的检查
private byte[] memory;
// 只能初始化堆内存
HeapMemorySegment(byte[] memory, Object owner) {
super(Objects.requireNonNull(memory), owner);
this.memory = memory;
}
...
}

public final class HybridMemorySegment extends MemorySegment {
private final ByteBuffer offHeapBuffer;

// 堆外内存初始化
HybridMemorySegment(ByteBuffer buffer, Object owner) {
super(checkBufferAndGetAddress(buffer), buffer.capacity(), owner);
this.offHeapBuffer = buffer;
}

// 堆内存初始化
HybridMemorySegment(byte[] buffer, Object owner) {
super(buffer, owner);
this.offHeapBuffer = null;
}
...
}

可以发现,HybridMemorySegment 中的很多方法其实都下沉到了父类去实现。包括堆内堆外内存的初始化。MemorySegment 中的 getXXX/putXXX 方法都是调用了 unsafe 方法,可以说MemorySegment已经具有了些 Hybrid 的意思了。HeapMemorySegment只调用了父类的MemorySegment(byte[] buffer, Object owner)方法,也就只能申请堆内存。另外,阅读代码你会发现,许多方法(大量的 getXXX/putXXX)都被标记成了 final,两个子类也是 final 类型,为的也是优化 JIT 编译器,会提醒 JIT 这个方法是可以被去虚化和内联的。

对于堆外内存,使用 HybridMemorySegment 能同时用来代表堆和堆外内存。这样只需要一个类就能代表长生命周期的堆外内存和短生命周期的堆内存。既然HybridMemorySegment已经这么全能,为什么还要方案1呢?因为我们需要工厂模式来保证只有一个子类被加载(为了更高的性能),而且HeapMemorySegment比heap模式的HybridMemorySegment要快。

下方是一些性能测试数据,更详细的数据请参考这篇文章

Segment Time
HeapMemorySegment, exclusive 1,441 msecs
HeapMemorySegment, mixed 3,841 msecs
HybridMemorySegment, heap, exclusive 1,626 msecs
HybridMemorySegment, off-heap, exclusive 1,628 msecs
HybridMemorySegment, heap, mixed 3,848 msecs
HybridMemorySegment, off-heap, mixed 3,847 msecs

总结

本文主要总结了 Flink 面对 JVM 存在的问题,而在内存管理的道路上越走越深。从自己管理内存,到序列化框架,再到堆外内存。其实纵观大数据生态圈,其实会发现各个开源项目都有同样的趋势。比如最近炒的很火热的 Spark Tungsten 项目,与 Flink 在内存管理上的思想是及其相似的。

参考资料

Flink 内存设置思路

Flink内存设置思路分为两个版本

分别是1.9之前和之后的

这里用Flink1.8为例,计算内存的代码位于org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters类的create方法。

按照计算步骤,以taskmanager.heap.size=6g为例子,其他参数保持不动,最终得到的参数如下:

​ -Xms4148m -Xmx4148m -XX:MaxDirectMemorySize=1996m

两块内存加起来是6144m = 6g jvm的设置符合参数。

Flink Dashboard上面显示的是:

​ JVM Heap Size:3.95 GB Flink Managed Memory:2.74 GB

​ JVM (Heap/Non-Heap) Commit: Heap:3.95 GB Non-Heap:141 MB Total:4.09 GB

​ Outside JVM:Capacity:457 MB

​ NetWork: count: xxxxx

计算过程

设容器内存总大小是x

详细看create方法:

1.  cutoff:容器超过3g, 简单可以记成 0.25x. flink为了防止内存溢出,计算的时候先切了一块内存下来不参与后续计算,这块就是cutoff
1
cutoff = Math.max(containerized.heap-cutoff-min, taskmanager.heap.size * containerized.heap-cutoff-ratio)

默认值是600和0.25,所以6g的时候=Math.max(600, 6144*0.25) = 1536m

剩余大小 0.75x6g = 4608m

  1. networkBufMB:简单记成 0.75*0.1x,最大1g

网络buffer使用内存分成新旧版,这里只关注新版,涉及参数:

1
2
3
4
taskmanager.memory.segment-size:32kb
taskmanager.network.memory.fraction:0.1
taskmanager.network.memory.min:64mb
taskmanager.network.memory.max:1g

计算参数:

1
Math.min(taskmanager.network.memory.max,Math.max(taskmanager.network.memory.min, taskmanager.network.memory.fraction * (x - cutoff))

这里的结果就是:Math.min(1g, Math.max(64mb, 0.1 * 4608m) = 460.8m

  1. heapSizeMB:0.75 * 0.9x

taskmanager.memory.off-heap默认为false,主要指的是Flink Managed Memory使用Heap还是Non-heap,默认使用Heap,如果开启使用Non-heap将再减少一部分资源。

计算公式:x - cutoff - networkBufMB

这里就是:4147.2    (注意:这个就是-xmx 4148m)

​ 4. offHeapSizeMB: x - heapSizeMB

就是1996m       (注意:这个就是XX:MaxDirectMemorySize: 1996m)

后续:上面只是一个jvm的参数预估设置,实际设置还与运行中环境有关,TaskManagerServices.fromConfiguration

会计算一个 freeHeapMemoryWithDefrag,计算之前会手动触发gc,然后用Jvm最大内存 - 总内存 + 空闲内存。

这个值可以认为是一个空运行的flink任务剩余的堆内存了。

后面将计算Flink管理的内存,这个指的是Flink Managed Memory Segment: taskmanager.memory.fraction默认是0.7,

被Flink管理的内存就是:freeHeapMemoryWithDefrag * 0.7

undefined

所以虽然6g内存计算出来后,heap是4148,但是在dashbord中显示不足4148, 为3.95G=4044.8, Flink managed内存小于 0.750.90.7 = 2903.04 , dashboard上显示2.74g = 2805.76m

框架运行需要:4148 - 4044.8 = 103.2m,3.95 * 0.7 = 2.765 > 2.74。没有相等,其他的内存使用暂时没有探究了。

Flink Managed内存一般用于批处理作业,流处理作业可以调整 taskmanager.memory.fraction,使得这部分内存用于用户代码。

Non - heap空间一般用于 JVM 的栈空间、方法区等堆外开销外,还包括网络 buffer、batch 缓存、RocksDB

Flink后面内存进行了较大的变动,也就是说之前上面云邪写的内存管理已经过时了

https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/ops/memory/mem_migration.html

这里设置单个taskmanager为14g,taskmanager.memory.managed.fraction为0.5,将会得到以下内容:

1
2
3
-Xmx5721030656  = 5456MB = 5.328g
-=1207959552 = 1152MB = 1.125g
-XX:MaxMetaspaceSize=100663296 = 96MB

可以发现,上面的加起来等于6704MB,远远不足14g,和1.8版本有很大的不同。

再看dashboard:

1
2
3
4
JVM Heap Size:5.19 GB   Flink Managed Memory:6.45 GB
JVM (Heap/Non-Heap) : Heap:5.19 GB Non-Heap:1.33 GB Total:6.52 GB
Outside JVM:Capacity:1.01GB
NetWork: count: xxxxx

可以计算得到6.45+6.52+1.01 = 13.98 等于14

1
2
3
4
5
6
7
8
9
10
11
taskmanager.memory.process.size 设置的是容器的内存大小,等于之前的 taskmanager.heap.size

计算过程在org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils中processSpecFromConfig方法,TaskExecutorProcessSpec类展示了1.10版本整个内存的组成。

计算方法分成3种:
1.指定了taskmanager.memory.task.heap.size和taskmanager.memory.managed.size
见方法:deriveProcessSpecWithExplicitTaskAndManagedMemory
2.指定了taskmanager.memory.flink.size
见方法:deriveProcessSpecWithTotalFlinkMemory
3.指定了taskmanager.memory.process.size(容器环境一般指定这个,决定全局容量)
totalProcessMemorySize = 设置的值 14g, jvmMetaspaceSize = taskmanager.memory.jvm-metaspace.size,默认96m,这个对应参数-XX:MaxMetaspaceSize=100663296。
1
2
3
4
5
jvmOverheadSize:

taskmanager.memory.jvm-overhead.min 192m
taskmanager.memory.jvm-overhead.max 1g
taskmanager.memory.jvm-overhead.fraction 0.1

公式 14g * 0.1 = 1.4g 必须在[192m, 1g]之间,所以jvmOverheadSize的大小是1g

totalFlinkMemorySize = 14g - 1g - 96m = 13216m

frameworkHeapMemorySize:taskmanager.memory.framework.heap.size 默认128m

frameworkOffHeapMemorySize:taskmanager.memory.framework.off-heap.size 默认128m

taskOffHeapMemorySize:taskmanager.memory.task.off-heap.size 默认0

确定好上面这些参数后,就是最重要的三个指标的计算了:

taskHeapMemorySize,networkMemorySize,managedMemorySize

计算分成确定了:taskmanager.memory.task.heap.size还是没确定。

1
2
3
4
5
6
7
8
9
10
11
12
13
确定了taskmanager.memory.task.heap.size
taskHeapMemorySize = 设置值
managedMemorySize = 设置了使用设置值,否则使用 0.4 * totalFlinkMemorySize
如果 taskHeapMemorySize + taskOffHeapMemorySize + frameworkHeapMemorySize + frameworkOffHeapMemorySize + managedMemorySize > totalFlinkMemorySize异常
networkMemorySize 等于剩余的大小,之后还会check这块内存是否充足,可以自己查看对应代码
未设置heap大小
先确定 managedMemorySize = 设置了使用设置值,否则使用 0.4 * totalFlinkMemorySize,这里就是 0.5 * 13216m = 6608 = 6.45g (这里就是dashboard的显示内容)
再确定network buffer大小,这个也是有两种情况,不细说。 [64mb, 1g] 0.1 * totalFlinkMemorySize = 1321.6, 所以是1g
最后剩余的就是taskHeapMemorySize,不能为负数,这里等于 13216 - 6608 - 1024 - 128 - 128 = 5328 = 5.2g (这里约等于dashboard的显示heap大小)
最后jvm的参数的计算过程:
jvmHeapSize = frameworkHeapSize + taskHeapSize = 5328 + 128 = 5456
jvmDirectSize = frameworkOffHeapMemorySize + taskOffHeapSize + networkMemSize = 128 + 1024 = 1152
jvmMetaspaceSize = 96m

全新内存划分

https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_detail.html

undefined

从计算过程,结合上图可以看出Flink 1.10中的一个内存划分了。

总内存 = Flink 内存 + JVM Metaspace (96m)+ JVM Overhead (计算为0.1 * 全局大小,结果必须在[192m, 1g]之间)

Flink内存被划分成6部分:

1
2
3
4
框架运行需要的Heap和Non Heap,默认都是128m
任务需要的Heap和Non Heap(默认0), Heap是通过计算其他5部分内存,Flink内存剩余得到
网络缓冲 (0.1 * Flink内存,结果必须在[64mb, 1g]之间)
Flink管理内存:0.4 * Flink内存

总结

Flink 1.10之前对内存的划分比较简单,主要就是Heap + Non-Heap,之后对内存做了更细致的切分。

Flink 1.8可以调整taskmanager.memory.fraction 减少Heap中的管理的内存,增大用户代码的内存使用,调整containerized.heap-cutoff-ratio,控制Non-heap空间,这个影响rocksdb。

Flink 1.10可以调整taskmanager.memory.managed.fraction 控制managed内存,这个影响rocksdb,也会影响taskHeap大小,需要衡量。

也可以看到Flink内存模型的变化managed内存位置也发生了变化,作用也有了些许变化。

JVM 主要划分 Heap 和 Non-Heap,Non-Heap又划分为Direct和Native等。

1.8的Non-Heap都是通过XX:MaxDirectMemorySize设置的

1.10的Network buffer在Direct里面,另一部分是Native(包括Managed Memory),主要用于rocksdb,如果使用的是Heap状态后台,可以设置小点,也用于Batch。

多流合并

遇到了一个多流合并的问题,目前给出的方案是:

  1. 分析业务数据源,很多需要多个流的join的场景 是伪命题,用union即可。

  2. union + group by ,在基于key的流中可以取代join。优势: 在join 发生数据倾斜或者反压,很难 checkpoint时,用union可以回避这个问题。

  3. 例如三个流join,可以 tempstream = stream1.join(stream2) ResultStream = tempstream.join(stream3)。语法支持,看了下生成的图,不确定是不是想要的效果。

  4. 将第三点的 join 换成 cogroup操作。这个是社区直播中,提问多流join后 得到的回复

  5. 基于blink引擎,多表join 直接用sql表达出来。select * from table1 a left join table2 b on a.id = b.id left join table3 c on b.id = c.id 这个方案也是在社区直播,提问多流join后 得到的回复。

SQL暂时是不用的,那么只剩下了第一种。

我尝试拆分一下逻辑。

Flink 的算子链机制

“为什么我的 Flink 作业 Web UI 中只显示出了一个框,并且 Records Sent 和Records Received 指标都是 0 ?是我的程序写得有问题吗?”

在 Flink 社区群里经常能看到类似这样的疑问。这种情况几乎都不是程序有问题,而是因为 Flink 的 operator chain ——即算子链机制导致的,即提交的作业的执行计划中,所有算子的并发实例(即 sub-task )都因为满足特定条件而串成了整体来执行,自然就观察不到算子之间的数据流量了。

当然上述是一种特殊情况。我们更常见到的是只有部分算子得到了算子链机制的优化,如官方文档中出现过多次的下图所示,注意 Source 和 map() 算子。

undefined

算子链机制的好处是显而易见的:所有 chain 在一起的 sub-task 都会在同一个线程(即 TaskManager 的 slot)中执行,能够减少不必要的数据交换、序列化和上下文切换,从而提高作业的执行效率。

undefined

逻辑计划中的算子链

对 Flink Runtime 稍有了解的看官应该知道,Flink 作业的执行计划会用三层图结构来表示,即:

  • StreamGraph —— 原始逻辑执行计划
  • JobGraph —— 优化的逻辑执行计划(Web UI 中看到的就是这个)
  • ExecutionGraph —— 物理执行计划

算子链是在优化逻辑计划时加入的,也就是由 StreamGraph 生成 JobGraph 的过程中。那么我们来到负责生成 JobGraph 的 o.a.f.streaming.api.graph.StreamingJobGraphGenerator 类,查看其核心方法 createJobGraph() 的源码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private JobGraph createJobGraph() {
// make sure that all vertices start immediately
jobGraph.setScheduleMode(streamGraph.getScheduleMode());
// Generate deterministic hashes for the nodes in order to identify them across
// submission iff they didn't change.
Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);
// Generate legacy version hashes for backwards compatibility
List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
}
Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes = new HashMap<>();
setChaining(hashes, legacyHashes, chainedOperatorHashes);

setPhysicalEdges();
// 略......

return jobGraph;
}

可见,该方法会先计算出 StreamGraph 中各个节点的哈希码作为唯一标识,并创建一个空的 Map 结构保存即将被链在一起的算子的哈希码,然后调用 setChaining() 方法,如下源码所示。

1
2
3
4
5
private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes, Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {
for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
createChain(sourceNodeId, sourceNodeId, hashes, legacyHashes, 0, chainedOperatorHashes);
}
}

可见是逐个遍历 StreamGraph 中的 Source 节点,并调用 createChain() 方法。createChain() 是逻辑计划层创建算子链的核心方法,完整源码如下,有点长。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
private List<StreamEdge> createChain(
Integer startNodeId,
Integer currentNodeId,
Map<Integer, byte[]> hashes,
List<Map<Integer, byte[]>> legacyHashes,
int chainIndex,
Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {
if (!builtVertices.contains(startNodeId)) {
List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();
List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();

StreamNode currentNode = streamGraph.getStreamNode(currentNodeId);
for (StreamEdge outEdge : currentNode.getOutEdges()) {
if (isChainable(outEdge, streamGraph)) {
chainableOutputs.add(outEdge);
} else {
nonChainableOutputs.add(outEdge);
}
}

for (StreamEdge chainable : chainableOutputs) {
transitiveOutEdges.addAll(
createChain(startNodeId, chainable.getTargetId(), hashes, legacyHashes, chainIndex + 1, chainedOperatorHashes));
}

for (StreamEdge nonChainable : nonChainableOutputs) {
transitiveOutEdges.add(nonChainable);
createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0, chainedOperatorHashes);
}

List<Tuple2<byte[], byte[]>> operatorHashes =
chainedOperatorHashes.computeIfAbsent(startNodeId, k -> new ArrayList<>());

byte[] primaryHashBytes = hashes.get(currentNodeId);
OperatorID currentOperatorId = new OperatorID(primaryHashBytes);

for (Map<Integer, byte[]> legacyHash : legacyHashes) {
operatorHashes.add(new Tuple2<>(primaryHashBytes, legacyHash.get(currentNodeId)));
}

chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs));
chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));
chainedPreferredResources.put(currentNodeId, createChainedPreferredResources(currentNodeId, chainableOutputs));

if (currentNode.getInputFormat() != null) {
getOrCreateFormatContainer(startNodeId).addInputFormat(currentOperatorId, currentNode.getInputFormat());
}
if (currentNode.getOutputFormat() != null) {
getOrCreateFormatContainer(startNodeId).addOutputFormat(currentOperatorId, currentNode.getOutputFormat());
}

StreamConfig config = currentNodeId.equals(startNodeId)
? createJobVertex(startNodeId, hashes, legacyHashes, chainedOperatorHashes)
: new StreamConfig(new Configuration());

setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs);

if (currentNodeId.equals(startNodeId)) {
config.setChainStart();
config.setChainIndex(0);
config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
config.setOutEdgesInOrder(transitiveOutEdges);
config.setOutEdges(streamGraph.getStreamNode(currentNodeId).getOutEdges());
for (StreamEdge edge : transitiveOutEdges) {
connect(startNodeId, edge);
}
config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));
} else {
chainedConfigs.computeIfAbsent(startNodeId, k -> new HashMap<Integer, StreamConfig>());
config.setChainIndex(chainIndex);
StreamNode node = streamGraph.getStreamNode(currentNodeId);
config.setOperatorName(node.getOperatorName());
chainedConfigs.get(startNodeId).put(currentNodeId, config);
}

config.setOperatorID(currentOperatorId);
if (chainableOutputs.isEmpty()) {
config.setChainEnd();
}
return transitiveOutEdges;
} else {
return new ArrayList<>();
}
}

先解释一下方法开头创建的 3 个 List 结构:

  • transitiveOutEdges:当前算子链在 JobGraph 中的出边列表,同时也是 createChain() 方法的最终返回值;
  • chainableOutputs:当前能够链在一起的 StreamGraph 边列表;
  • nonChainableOutputs:当前不能够链在一起的 StreamGraph 边列表。

接下来,从 Source 开始遍历 StreamGraph 中当前节点的所有出边,调用 isChainable() 方法判断是否可以被链在一起(这个判断逻辑稍后会讲到)。可以链接的出边被放入 chainableOutputs 列表,否则放入 nonChainableOutputs 列表。

对于 chainableOutputs 中的边,就会以这些边的直接下游为起点,继续递归调用createChain() 方法延展算子链。对于 nonChainableOutputs 中的边,由于当前算子链的延展已经到头,就会以这些“断点”为起点,继续递归调用 createChain() 方法试图创建新的算子链。也就是说,逻辑计划中整个创建算子链的过程都是递归的,亦即实际返回时,是从 Sink 端开始返回的。

然后要判断当前节点是不是算子链的起始节点。如果是,则调用 createJobVertex()方法为算子链创建一个 JobVertex( 即 JobGraph 中的节点),也就形成了我们在Web UI 中看到的 JobGraph 效果:

undefined

最后,还需要将各个节点的算子链数据写入各自的 StreamConfig 中,算子链的起始节点要额外保存下 transitiveOutEdges。StreamConfig 在后文的物理执行阶段会再次用到。

形成算子链的条件

来看看 isChainable() 方法的代码。 由此可得,上下游算子能够 chain 在一起的条件还是非常苛刻的(老生常谈了),列举如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);

StreamOperatorFactory<?> headOperator = upStreamVertex.getOperatorFactory();
StreamOperatorFactory<?> outOperator = downStreamVertex.getOperatorFactory();

return downStreamVertex.getInEdges().size() == 1
&& outOperator != null
&& headOperator != null
&& upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
&& outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS
&& (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||
headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)
&& (edge.getPartitioner() instanceof ForwardPartitioner)
&& edge.getShuffleMode() != ShuffleMode.BATCH
&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
&& streamGraph.isChainingEnabled();
}
  • 上下游算子实例处于同一个 SlotSharingGroup 中(之后再提);

  • 下游算子的链接策略(ChainingStrategy)为 ALWAYS ——既可以与上游链接,也可以与下游链接。我们常见的 map()、filter() 等都属此类;

  • 上游算子的链接策略为 HEAD 或 ALWAYS。HEAD 策略表示只能与下游链接,这在正常情况下是 Source 算子的专属;

  • 两个算子间的物理分区逻辑是 ForwardPartitioner ,可参见之前写过的《聊聊Flink DataStream 的八种物理分区逻辑》;

  • 两个算子间的 shuffle 方式不是批处理模式;

  • 上下游算子实例的并行度相同;

  • 没有禁用算子链。

禁用算子链

用户可以在一个算子上调用 startNewChain() 方法强制开始一个新的算子链,或者调用 disableOperatorChaining() 方法指定它不参与算子链。代码位于 SingleOutputStreamOperator 类中,都是通过改变算子的链接策略实现的。

1
2
3
4
5
6
7
8
9
@PublicEvolving
public SingleOutputStreamOperator<T> disableChaining() {
return setChainingStrategy(ChainingStrategy.NEVER);
}

@PublicEvolving
public SingleOutputStreamOperator<T> startNewChain() {
return setChainingStrategy(ChainingStrategy.HEAD);
}

如果要在整个运行时环境中禁用算子链,调用 StreamExecutionEnvironment.disableOperatorChaining() 方法即可。

物理计划中的算子链

在 JobGraph 转换成 ExecutionGraph 并交由 TaskManager 执行之后,会生成调度执行的基本任务单元 ——StreamTask,负责执行具体的 StreamOperator 逻辑。在StreamTask.invoke() 方法中,初始化了状态后端、checkpoint 存储和定时器服务之后,可以发现:

1
2
operatorChain = new OperatorChain<>(this, recordWriters);
headOperator = operatorChain.getHeadOperator();

构造出了一个 OperatorChain 实例,这就是算子链在实际执行时的形态。解释一下OperatorChain 中的几个主要属性。

1
2
3
4
private final StreamOperator<?>[] allOperators;
private final RecordWriterOutput<?>[] streamOutputs;
private final WatermarkGaugeExposingOutput<StreamRecord<OUT>> chainEntryPoint;
private final OP headOperator;
  • headOperator:算子链的第一个算子,对应 JobGraph 中的算子链起始节点;
  • allOperators:算子链中的所有算子,倒序排列,即 headOperator 位于该数组的末尾;
  • streamOutputs:算子链的输出,可以有多个;
  • chainEntryPoint:算子链的“入口点”,它的含义将在后文说明。

由上可知,所有 StreamTask 都会创建 OperatorChain。如果一个算子无法进入算子链,也会形成一个只有 headOperator 的单个算子的 OperatorChain。

OperatorChain 构造方法中的核心代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
for (int i = 0; i < outEdgesInOrder.size(); i++) {
StreamEdge outEdge = outEdgesInOrder.get(i);
RecordWriterOutput<?> streamOutput = createStreamOutput(
recordWriters.get(i),
outEdge,
chainedConfigs.get(outEdge.getSourceId()),
containingTask.getEnvironment());
this.streamOutputs[i] = streamOutput;
streamOutputMap.put(outEdge, streamOutput);
}

// we create the chain of operators and grab the collector that leads into the chain
List<StreamOperator<?>> allOps = new ArrayList<>(chainedConfigs.size());
this.chainEntryPoint = createOutputCollector(
containingTask,
configuration,
chainedConfigs,
userCodeClassloader,
streamOutputMap,
allOps);

if (operatorFactory != null) {
WatermarkGaugeExposingOutput<StreamRecord<OUT>> output = getChainEntryPoint();
headOperator = operatorFactory.createStreamOperator(containingTask, configuration, output);
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_OUTPUT_WATERMARK, output.getWatermarkGauge());
} else {
headOperator = null;
}

// add head operator to end of chain
allOps.add(headOperator);
this.allOperators = allOps.toArray(new StreamOperator<?>[allOps.size()]);

首先会遍历算子链整体的所有出边,并调用 createStreamOutput() 方法创建对应的下游输出 RecordWriterOutput。然后就会调用 createOutputCollector() 方法创建物理的算子链,并返回 chainEntryPoint,这个方法比较重要,部分代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private <T> WatermarkGaugeExposingOutput<StreamRecord<T>> createOutputCollector(
StreamTask<?, ?> containingTask,
StreamConfig operatorConfig,
Map<Integer, StreamConfig> chainedConfigs,
ClassLoader userCodeClassloader,
Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,
List<StreamOperator<?>> allOperators) {
List<Tuple2<WatermarkGaugeExposingOutput<StreamRecord<T>>, StreamEdge>> allOutputs = new ArrayList<>(4);

// create collectors for the network outputs
for (StreamEdge outputEdge : operatorConfig.getNonChainedOutputs(userCodeClassloader)) {
@SuppressWarnings("unchecked")
RecordWriterOutput<T> output = (RecordWriterOutput<T>) streamOutputs.get(outputEdge);
allOutputs.add(new Tuple2<>(output, outputEdge));
}

// Create collectors for the chained outputs
for (StreamEdge outputEdge : operatorConfig.getChainedOutputs(userCodeClassloader)) {
int outputId = outputEdge.getTargetId();
StreamConfig chainedOpConfig = chainedConfigs.get(outputId);
WatermarkGaugeExposingOutput<StreamRecord<T>> output = createChainedOperator(
containingTask,
chainedOpConfig,
chainedConfigs,
userCodeClassloader,
streamOutputs,
allOperators,
outputEdge.getOutputTag());
allOutputs.add(new Tuple2<>(output, outputEdge));
}
// 以下略......
}

该方法从上一节提到的 StreamConfig 中分别取出出边和链接边的数据,并创建各自的 Output。出边的 Output 就是将数据发往算子链之外下游的 RecordWriterOutput,而链接边的输出要靠 createChainedOperator() 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
private <IN, OUT> WatermarkGaugeExposingOutput<StreamRecord<IN>> createChainedOperator(
StreamTask<?, ?> containingTask,
StreamConfig operatorConfig,
Map<Integer, StreamConfig> chainedConfigs,
ClassLoader userCodeClassloader,
Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,
List<StreamOperator<?>> allOperators,
OutputTag<IN> outputTag) {
// create the output that the operator writes to first. this may recursively create more operators
WatermarkGaugeExposingOutput<StreamRecord<OUT>> chainedOperatorOutput = createOutputCollector(
containingTask,
operatorConfig,
chainedConfigs,
userCodeClassloader,
streamOutputs,
allOperators);

// now create the operator and give it the output collector to write its output to
StreamOperatorFactory<OUT> chainedOperatorFactory = operatorConfig.getStreamOperatorFactory(userCodeClassloader);
OneInputStreamOperator<IN, OUT> chainedOperator = chainedOperatorFactory.createStreamOperator(
containingTask, operatorConfig, chainedOperatorOutput);

allOperators.add(chainedOperator);

WatermarkGaugeExposingOutput<StreamRecord<IN>> currentOperatorOutput;
if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {
currentOperatorOutput = new ChainingOutput<>(chainedOperator, this, outputTag);
}
else {
TypeSerializer<IN> inSerializer = operatorConfig.getTypeSerializerIn1(userCodeClassloader);
currentOperatorOutput = new CopyingChainingOutput<>(chainedOperator, inSerializer, outputTag, this);
}

// wrap watermark gauges since registered metrics must be unique
chainedOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, currentOperatorOutput.getWatermarkGauge()::getValue);
chainedOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_OUTPUT_WATERMARK, chainedOperatorOutput.getWatermarkGauge()::getValue);
return currentOperatorOutput;
}

我们一眼就可以看到,这个方法递归调用了上述 createOutputCollector() 方法,与逻辑计划阶段类似,通过不断延伸 Output 来产生 chainedOperator(即算子链中除了headOperator 之外的算子),并逆序返回,这也是 allOperators 数组中的算子顺序为倒序的原因。

chainedOperator 产生之后,将它们通过 ChainingOutput 连接起来,形成如下图所示的结构。

undefined

最后来看看 ChainingOutput.collect() 方法是如何输出数据流的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Override
public void collect(StreamRecord<T> record) {
if (this.outputTag != null) {
// we are only responsible for emitting to the main input
return;
}
pushToOperator(record);
}

@Override
public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
if (this.outputTag == null || !this.outputTag.equals(outputTag)) {
// we are only responsible for emitting to the side-output specified by our
// OutputTag.
return;
}
pushToOperator(record);
}

protected <X> void pushToOperator(StreamRecord<X> record) {
try {
// we know that the given outputTag matches our OutputTag so the record
// must be of the type that our operator expects.
@SuppressWarnings("unchecked")
StreamRecord<T> castRecord = (StreamRecord<T>) record;
numRecordsIn.inc();
operator.setKeyContextElement1(castRecord);
operator.processElement(castRecord);
}
catch (Exception e) {
throw new ExceptionInChainedOperatorException(e);
}
}

可见是通过调用链接算子的 processElement() 方法,直接将数据推给下游处理了。也就是说,OperatorChain 完全可以看做一个由 headOperator 和 streamOutputs组成的单个算子,其内部的 chainedOperator 和 ChainingOutput 都像是被黑盒遮蔽,同时没有引入任何 overhead。

打通了算子链在执行层的逻辑,看官应该会明白 chainEntryPoint 的含义了。由于它位于递归返回的终点,所以它就是流入算子链的起始 Output,即上图中指向 headOperator 的 RecordWriterOutput。

Flink Stream 类型转换

1.Map[DataStream -> DataStream]
调用用户定义的MapFunction对DataStream[T]数据进行处理,形成新的DataStream[T],其中数据格式可能会发生变化,常用作对数据集内数据的清洗和转换。

2.FlatMap[DataStream -> DataStream]
主要对输入的元素处理之后生成一个或者多个元素

3.Filter[DataStream -> DataStream]
该算子将按照条件对输入数据集进行筛选操作,将符合条件的数据集输出,将不符合条件的数据过滤掉

4.KeyBy[DataStream -> KeyedStream]
该算子根据指定的key将输入的DataStream[T]数据格式转换为KeyedStream[T],也就是在数据集中执行Partition操作,将相同的key值的数据放置在相同的分区中。简单来说,就是sql里面的group by

5.Reduce[KeyedStream -> DataStream]
该算子和MapReduce的Reduce原理基本一致,主要目的是将输入的KeyedStream通过传入的用户自定义的ReduceFunction滚动的进行数据聚合处理,其中定义的ReduceFunction必须满足运算结合律和交换律

6.Union[DataStream -> DataStream]
将两个或者多个输入的数据集合并成一个数据集,需要保证两个数据集的格式一致,输出的数据集的格式和输入的数据集格式保持一致

7.Connect, CoMap, CoFlatMap[DataStream -> DataStream]
Connect算子主要是为了合并两种后者多种不同数据类型的数据集,合并后悔保留原来的数据集的数据类型。连接操作允许共享状态数据,也就是说在多个数据集之间可以操作和查看对方数据集的状态。

8.Split[DataStream -> SplitStream]
Split是将一个DataStream数据集按照条件进行拆分,形成两个数据集的过程

split stream要打印,需要转换成DataStream,使用select方法

WindowedStream代表了根据key分组,并且基于WindowAssigner切分窗口的数据流。所以WindowedStream都是从KeyedStream衍生而来的。

而在WindowedStream上进行任何transformation也都将转变回DataStream

1
2
3
4
5
DataStream[MyType] stream = ...
WindowedDataStream[MyType] windowed = stream
.keyBy("userId")
.window(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data
DataStream[ResultType] result = windowed.reduce(myReducer)

在key分组的流上进行窗口切分是比较常用的场景,也能够很好地并行化(不同的key上的窗口聚合可以分配到不同的task去处理)。不过当我们需要在普通流上进行窗口操作时,就要用到 AllWindowedStreamAllWindowedStream是直接在DataStream上进行windowAll(...)操作。AllWindowedStream 的实现是基于 WindowedStream 的。Flink 不推荐使用AllWindowedStream,因为在普通流上进行窗口操作,就势必需要将所有分区的流都汇集到单个的Task中,而这个单个的Task很显然就会成为整个Job的瓶颈。

Flink windowStream 进行操作,要使用apply函数,传入windowFunction,不能在使用Process。

join,coGroup,connect,union

join

1 可用于DataStream和DataSet。只能2个DataStream一起join,或者2个DataSet一起join

2 用于DataStream时返回是JoinedStreams ,用于DataSet时返回是JoinOperatorSets

3 用于DataStream时需要与窗口同时使用,语法是:join where equalTo window apply ,用于DataSet时的语法是:join where equalTo with (where是指定第一个输入的分区字段,equalTo是指定第二个输入的分区字段,这2个字段类型需要一致)

4 与SQL中的inner join同义,只输出2个实时窗口内或2个数据集合内能匹配上的笛卡尔积,不能匹配上的不输出。

5 apply方法中或with方法中均可以使用JoinFunction或 FlatJoinFunction处理匹配上的数据对(用于DataStream和DataSet时均可)

6 侧重对2个输入里的 数据对 进行处理,join方法的入参是单个数据

7 可以join2个类型不同的流或join2个类型不同的数据集(比如Tuple2<String, Long> join Tuple2<Long,String>),但是匹配的key或field类型要一致,不然报错(比如where中的String与equalTo中的String匹配才行)

coGroup

1 可用于DataStream和DataSet。只能2个DataStream一起coGroup,或者2个DataSet一起coGroup

2 用于DataStream时返回是CoGroupedStreams,用于DataSet时返回是CoGroupOperatorSets

3 用于DataStream时需要与窗口同时使用,语法是:coGroup where equalTo window apply ,用于DataSet时的语法是:coGroup where equalTo with,

4 把2个实时窗口内或2个数据集合内key相同的数据分组同一个分区,key不能匹配上的数据(只在一个窗口或集合内存在的数据)也分组到另一个分区上。

5 apply方法中或with方法中均可以使用CoGroupFunction对数据分组(用于DataStream和DataSet时均可,无FlatCoGroupFunction)

6 侧重对2个输入的 集合 进行处理,coGroup方法的入参是Iterable类型

7 可以coGroup2个类型不同的流或coGroup2个类型不同的数据集(比如Tuple2<String, Long> join Tuple2<Long,String>),但是匹配的key或field类型要一致,不然报错(比如where中的String与equalTo中的String匹配才行)

connect

1 只能用于DataStream,返回是ConnectedStreams。不能用于DataSet.

2 只能2个流一起connect(stream1.connect(stream2))

3 connect后可以对2个流分别处理(使用CoMapFunction或CoFlatMapFunction)

4 可以connect2个类型不同的流(比如Tuple2<String, Long> connect Tuple2<Long,String>)

union

1 用于DataStream时,返回是Datastream;用于DataSet时,返回是DataSet;

2 可以多个流一起合并(stream1.union(stream2,stream3,stream4)),合并结果是一个新Datastream;只能2个DataSet一起合并,合并结果是一个新DataSet

3 无论是合并Datastream还是合并DataSet,都不去重,2个源的消息或记录都保存。

4 不可以union 2个类型不同的流或union 2个类型不同的数据集

undefined

更多细节

Flink TimerService Timers

API

currentProcessingTime(): Long 返回当前处理时间
**currentWatermark(): Long 返回当前 watermark 的时间戳
**registerProcessingTimeTimer(timestamp: Long)
: Unit 会注册当前 key 的processing time 的定时器。当 processing time 到达定时时间时,触发 timer。
**registerEventTimeTimer(timestamp: Long): Unit 会注册当前 key 的 event time 定时器。当 水位线大于等于定时器注册的时间时,触发定时器执行回调函数。
**deleteProcessingTimeTimer(timestamp: Long)
: Unit 删除之前注册处理时间定时器。如果没有这个时间戳的定时器,则不执行。
**deleteEventTimeTimer(timestamp: Long)**: Unit 删除之前注册的事件时间定时器,如果没有此时间戳的定时器,则不执行。