超越批处理的世界——流式计算(2)

超越批处理的世界——流式计算(2)

作者:Tyler Akidau

译者:shenghaishxt

原文:https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102

介绍

欢迎回来!如果你错过了我之前的帖子——超越批处理的世界——流式计算 102,我强烈建议你先阅读它。上一篇帖子是这一篇的基础,而且在这一篇帖子中,我会假设你已经熟悉了之前的术语和概念。

另外请注意,这篇帖子中包含了大量的动画,因此尝试打印它的人可能会错过一些重要的部分。

那么让我开始狂欢吧。我们简单回顾以下,上个帖子中我们关注着三个主要领域:术语,准确定义我在使用“流式传输”这样的术语时的意思;批处理与流处理,比较两种系统的理论功能,并且假设只有两种事情才能使流处理超越批处理:即正确性和时间推理工具;数据处理模式,是研究批处理和刘处理在处理有界和无界数据时采用的基本方法。

在这篇文章中,我希望能够进一步关注上篇文章中的数据处理模式,但是下面会阐述更加详细的内容以及具体的示例。我们将遍历两个主要部分:

  • 流式计算(1)回顾:简要回顾上一篇中引入的概念,然后增加一个示例来突出显示所做的要点。
  • 流式计算(2):这是流计算(1)的配套部分,详细介绍了在处理无限数据时重要的附加概念,并且将通过示例来解释这些概念。

在我们完成的时候,我们将涉及到强大的无序数据处理的核心原则与概念,这让你能够真正超越经典的批处理。

为了让你了解它们在运行中的意义,我将使用Dataflow中的代码片段(即Google Cloud Dataflow中的API),然后结合动画提供可视化表示。使用Dataflow而不是其他工具(如Spark或是Stiom)的原因是,这些系统都不能够提供我想要表达的东西。好消息是,很多项目正在朝这个方向发展,更好的消息是我们(谷歌)今天提交了一份提案到Apache软件基金会,建议创建Apache Dataflow孵化项目(联合Srtisans、Cloudera、Talend以及其他的一些公司),希望能够围绕强大的无序处理语义来构建更加开放的社区和由数据流模型提供的生态系统。这是个十分有趣的2016年,但是抱歉,我离题了。

这篇文章中缺少了我上次提到的比较部分,我对此感到抱歉。我低估了这篇文章所需要花费的时间。目前,我无法看到任何可以耽搁的部分。如果说有什么事情可以安慰我,那就是我在Strata + Hadoop World Singapore 2015上发表的题为“大规模数据处理的演变”(并且将在6月的Strata + Hadoop World London 2016上发布它的新版本)的演讲,这里的比较部分涵盖了我很多想要解决的内容;而且幻灯片十分精美,在这里使用的话能够增加你的阅读乐趣。

现在,让我们开始流计算吧!

总结和路线图

在流计算101中,我首先澄清了一些术语,从区分有限数据和无限数据开始。有限数据源的大小有限,通常被称作“批处理数据”。无限数据源的大小是无限的,通常被称作“流式”数据。我尽量避免使用批处理和流式传输来修饰数据源,因为这些名称可能是具有误导性的,并且常常受到限制。

然后,我继续解释了批处理和流处理之间的区别:批处理设计是优先考虑有限数据的,而流处理常常考虑到无限数据。我的目标是在描述执行引擎时仅使用批处理和流处理。

在介绍完这些术语之后,我介绍了两个与处理无限数据相关的重要的基本概念。我首先阐述了事件时间(事件发生的时间)和处理时间(事件被系统处理的时间)之间的关键区别。这为Streaming 101中提出的主要论点提供了基础:如果关心事件的实际发生时间,就必须分析与其事件时间相关的数据,而不是它们的处理时间。

然后我介绍了窗口的概念(即按照时间边界切分数据集),这是一种常见的方法,用于对于无限数据源的数据处理,在理论上,无限数据源永远不会消失。窗口策略的一些更简单的例子是固定窗口和滑动窗口。但是更复杂的窗口类型,如会话窗口(其中窗口由数据本身的特征定义,例如捕获每个用户的活动会话窗口,会话窗口之后紧接用户的不活动期)也是比较常见的用法。

除了上一部分中介绍的这两个概念之外,我们现在要仔细研究另外三个新的概念:

  • Watermarks

Watermarks是相对于事件时间的输入完整性的概念。它代表一个时间X,表示所有事件时间小于X的数据都已经到齐。因此,在处理无限数据源的时候,Water可以用作进度的度量。

  • 触发器

触发器是一种由外部条件触发,用于表明何时计算窗口结果的机制。触发器在选择何时将计算结果发送给下游提供了灵活性。随着数据不停地到来,窗口能够产生多次输出。这又开启了能够先提供近似结果的大门,并且能够灵活地应对上游数据的变化(可能是上游的数据修正)或者相对于Watermarks延迟的数据(例如一个移动场景,某人的电话离线,在离线的过程中,电话记录了发生的各种动作和事件时间,然后在重新连接的时候上传这些事件进行处理)。

  • 累积

累积模式定义了同一个窗口中观察到的不同结果之间的关系。这些结果之间可能是完全独立的,也可能存在着重叠。不同的累积模式具有不同的语义和计算成本,能够适用于各种各样的用例。

最后,我们将重新回顾一些旧问题,因为我认为它可以更容易地理解所有这些概念之间的关系,并且在回答无限数据处理中的四个问题时探索新的问题,我提出的所有问题在每个无限数据处理问题中都至关重要:

  • What 计算的结果是什么?

这个问题由管道中的转换来回答。包括计算总和、构建直方图、训练机器学习模型等等。这也是经典批处理回答的问题。

  • Where 在事件时间中的哪个位置计算结果?

这个问题由使用事件时间窗口的数据管道来回答。这包括来自Streaming 101(固定,滑动和会话)窗口的常见示例,似乎没有窗口概念的用例(例如,Streaming 101中描述的时间不可知处理;传统的批处理通常也属于这种类别)和其他的更复杂的窗口类型,例如有时间限制的拍卖。需要注意的是,如果将入口时间指定为记录的事件时间,那么它也可以包含处理时间窗口。

  • When 在处理时间的哪个时刻触发计算结果?

这个问题用Watermarks和触发器来回答。这个主题的变化性非常大,但是最常见的模式是使用Watermarks来描述给定的窗口的输入完成时间,触发器允许提前计算结果(对于在窗口完成之前发出部分具有推测性的结果)和延迟计算结果(Watermarks仅仅是预估完整性,在Watermarks声明的给定窗口全部到达之后还是有可能到达其他隶属于该窗口的输入数据的)。

  • How 怎么修正相关结果?

这个问题由使用的累积类型来回答:丢弃(其中结果是相互独立和不同的),累加(之后的结果建立在先前的结果上),或累加和回收(当前的累加值和先前触发的值一起发送)。

我们将会在之后一一讨论这些问题,我们将引用一些例子来说明,从而清楚地说明哪些概念与What/Where/When/How中的哪个问题有关。

流式计算(1)回顾

首先,让我们回顾以下流式计算101中的一些概念,这次我们将会提供具体的例子来将这些概念讲解得更加具体。

What:变换

经典批处理中的转换回答了一个问题:“计算的结果是什么?”尽管可能你们很多人已经熟悉了经典的批处理,但是我们还是以它为起点,因为经典的批处理是其他概念的基础。

在本节中,我们来看一个例子:计算一个由10个整数组成的数据集中的总和。如果你想理解得更加具体一点的话,可以把它看作是一个游戏,一个包含10个人的团队,将每个独立玩家所得到的分数相加就得到了总体的成绩。可以想象的是,对于计费和监控情况使用也同样适用。

对于每个例子,我将包含一小段剪短的Dataflow Java SDK伪代码片段,以使管道的定义更加具体。由于是伪代码,所以我有时会省略一些细节(例如具体使用的I/O源),以及简化名字(Java中当前的触发器名称非常冗长;为了能够清晰的表示,我将使用更为简单的名称)。除了一些不重要的东西之外(大部分我将在Postscript中列举),它基本上是实际情况中的Dataflow SDK代码。对于那些能够编译运行自己的例子的人,我可以提供代码练习的链接。

如果你熟悉Spark Streaming或者Flink这样的计算引擎,那你在熟悉Dataflow代码的问题上应该比较轻松。接下来请看,在Data中有两个基本的原语:

  • PCollections,表示可能进行并行处理的数据集(可能十分庞大,因此名称开始处是P)。

  • PTransforms,它们被PCollections用来创造新的PCollections。PTransforms能够执行元素的转换,可以将多个元素聚合在一起,或者它们也可以是其他PTransforms的组合。

图1:变换的类型

如果你发现自己感到困惑,或者想要其他的一些参考的话,那么可以看看 Dataflow Java SDK文档。

就我们的例子而言,我们假定我们从一个PCollection>命名为“input”(PCollection由字符串和整数组成,其中String类似于团队的名字,Interger是相应的分数)开始。在实际情况的管道中,通过I/O源读取原始数据(如日志记录)获取输入,将日志记录解析为适当的键/值对,然后转换为PCollection>。为了清楚地表达,我将在第一个例子中包含所有步骤的伪代码。但在之后的例子中,我会删除I/O和解析部分。

因此,对于简单地从I/O源读取数据的管道,解析出团队/分数键值对,然后计算每队的分数总和,我们会有下面的代码:

1
2
3
PCollection<String> raw = IO.read(...);
PCollection<KV<String, Integer>> input = raw.apply(ParDo.of(new ParseFn());
PCollection<KV<String, Integer>> scores = input.apply(Sum.integersPerKey());
列表1.计算管道的总和。从I/O源读取键/值数据,其中键是String类型(如团队名字),值是Integer类型(如团队中个人得分)。然后对每个键计算键的总和,然后输出。

对于所有的例子来说,在每个例子关于管道的代码片段(我们即将分析它们)之后,我们将看到一个管道执行的动画示例。更具体的是,我们将在动画中看到一个Key的10个输入数据执行管道的过程。在一个真实的管道中,你能够想象这样成千上万个类似的操作在很多台机器上并行执行。但为了我们的例子,我们需要将事情简单化。

每个动画在两个维度上绘制输入和输出:事件时间(在X轴上)和处理时间(在Y轴上)。因此,管道将按照处理时间维度来执行,从底部到顶部,如加粗的上升白线所示。输入是圆,圆圈内的数字代表特定记录的值。当管道观察到它们时,它们开始变灰并且改变颜色。

当管道处理到某一个值的时候,便累加并记录在State中,最终实现聚合结果作为输出。State和输出由矩形表示,矩形上方不断变化的数字是累加值,矩形覆盖的区域表示当前时刻所有矩形中的数据都已经被处理完毕。对于清单1中的管道,当执行一个典型的批处理的时候,看起来是这样的(请注意:你需要点击下面图片的开始动画,然后动画会一直循环直到你下一次点击):

图2:经典的批处理

由于这是一个批处理管道,它会累加输入的数据,直到所有的输入(由顶部的绿色虚线表示)完毕,这时它会输出51。在这个例子中,由于没有应用任何特定的窗口,所以我们计算了所有事件时间上数据集的和;因此,状态和输出的矩形覆盖了整个X轴。但如果我们想要处理一个无限数据源,那么这种经典的批处理是不够的;我们无法等待输入结束,事实上它根本不会结束。所以我们需要窗口的概念,这是我们在流式计算101中曾提到的。因此,想要回答第二个问题:“在事件时间的哪个位置计算结果?”,我们需要回顾窗口这个概念。

Where:窗口化

正如上一章讨论的那样,窗口是沿着时间对数据源进行分片的过程。常见的窗口策略包括固定窗口、滑动窗口、会话窗口。

图3.窗口类型示例。每个示例显示三个不同的键,图中显示了对齐窗口(适用于整个数据集)和非对其窗口(适用于数据子集)之间的差异。

为了更好地了解实际中的窗口使用,我们将上边提到的整数求和管道设置为固定的长度为两分钟的时间窗口。使用Dataflow SDK,只需要简单地添加一个Window.into转换即可(加粗文本突出显示):

1
2
3
PCollection<KV<String, Integer>> scores = input
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(2))))
.apply(Sum.integersPerKey());
列表2.使用窗口的求和代码

回忆一下,Dataflow提供了一个统一的模型,它能够同时工作在批处理和流处理中,因为实际上批处理只是流处理的一个子集。因此,我们首先在批处理引擎上执行这个管道;这个原理比较简单,当我们切换到流引擎时,它能够与流引擎进行比较。

图4.在批处理引擎上进行基于窗口的求和

和之前一样,输入不断在State上累加,直到输入被完全处理,然后再产生输出。然而,在这种情况,我们得到的不是一个输出而是四个:四个相关的事件时间窗口都会产生输出。

目前,我们重新讨论了流式计算101中引入的两个重要概念:事件时间和处理时间之间的关系,还有窗口的概念。如果我们想进一步的,我们就需要讨论本章开始时提到的新概念:Watermarks、触发器和累积。下面我们开始流式计算(2)。

流式计算(2)

我们刚刚在批处理引擎上观察到了使用窗口时管道的执行情况。在理想情况下,我们希望能够具有较低的延迟,并且我们还希望能够处理无限数据集。所以切换到流式引擎是朝着正确方向迈出的一步。但是,虽然批处理引擎能够知道每个窗口的输入都已经完成(即有界输入源中的所有数据都已经被处理完毕),但是我们目前还缺乏一种实用的方法来确定无界数据集的完整性。让我们进入Watermarks吧。

When:watermarks

Watermarks“在处理时间的维度上,何时计算窗口的结果?”的前半部分的答案。Watermarks是事件时间中输入完整性的时间概念。换句话说,它们是系统根据当前处理数据的事件时间来判断处理的进度和完整性的度量方式(无论数据集是有界的还是无界的,尽管在无界数据集的情况下Watermarks更有用)。

回忆一下流式计算101中的这个图,这里我们稍作修改,其中描述了事件时间和处理时间之间的偏差,大多数现实世界的分布式数据处理系统中的时间偏差也一直在变化。

图5.事件时间进度、偏差和watermarks

图中我所说的那条曲折的红线实际上就是watermark。随着处理时间的推移,它捕捉到了事件时间完整性的过程。在概念上,可以将watermark看作是一个函数,即F(P)-> E,它在处理时间中选取一个点,并在事件时间中对应一个点。(更准确地说,函数地输入实际上是在管道中观察到watermark的点的上游的所有事物的当前状态:输入源、缓冲数据、正在处理的数据等;但从概念上说,将其看作是从处理时间到事件时间的映射更加简单。)事件时间的那个点E是系统认为所有事件时间小于E的数据都到齐了。换句话说,再也不会有事件时间少于E的数据了。根据watermarks的类型,无论是理想的或者是启发式的,这种推断可能是一种严格且有根据的猜测:

  • 理想watermarks:在我们完全了解所有的输入数据的情况下,就可以构建一个理想的watermarks;在这种情况下,没有延迟数据;所有的数据都是提前或者准时到达。
  • 启发式watermarks:对于许多分布式的输入源来说,对输入数据完全了解是不切实际的。这种情况下,最佳选择是提供启发式的watermarks。启发式watermarks使用有关输入的任何信息(分区、分区内的排序、文件增长率等等),以便尽可能地进行进度估计。在许多情况下,这样的watermarks预测十分准确。即便如此,启发式watermarks的使用仍然意味着它有时可能是错误的,这将导致有些数据被判定为延迟数据。我们将在之后触发器的部分讲解如何处理延迟数据。

watermarks是一个有趣但复杂的话题,这里我们所谈论的内容只是冰山一角,未来我将会在新的帖子中进一步讨论它。现在,为了更好地了解watermarks的作用和它的缺点,我们来看使用流引擎的两个例子:从而确定何时在清单2中执行窗口的管道时实现输出。左边的例子使用完美watermarks,右边的使用启发式watermarks。

图6.在流9引擎上使用完美(左)和推测式(右)Watermarks进行基于窗口的求和

在这两种情况下,当Watermarks通过窗口的末尾时,窗口就被触发计算。这两个执行主要区别在于右侧的Watermarks计算中使用的启发式算法没有考虑到9的值,这极大地改变了watermarks的形状。这些例子突出了Watermarks的两个缺点(以及其他完整性的概念),具体是:

  • 太慢

当任何类型的Watermarks由于已知的未处理数据(例如由于网络带宽的限制而缓慢输入的日志)而被延迟时,如果计算结果的唯一依据是Watermarks的触发,那么将会之间导致输出结果的延迟。

这在左边的图表中十分明显,晚到的9阻碍了所有后续窗口的watermarks,尽管这些后续窗口完成得更早。第二个窗口(12:02,12:04)尤为明显,从窗口的第一个值到达到窗口计算输出结果的时间需要近7分钟的时间。这个例子中的启发式watermarks的情况稍微好一些(离输出5分钟的时间),但不要认为启发式watermarks不会受到watermarks延迟的影响;实际上,我只是选用了一个特殊的例子突出了这种对比。

这里最重要的一点是:尽管watermarks提供了一个非常有用的完整性概念,但是从延迟的角度来说,依赖完整性来生成输出通常不是很理想。想象一个仪表板,其中包含有价值的指标,按照小时或者天显示。我们不太可能会想等待整整一小时或者一天才能看到当前窗口的结果;这就是使用经典批处理为这种系统提供数据的痛点之一。相反,随着输入的变化,这些窗口的结果会随着时间的推移而变得越来越完整。

  • 太快

当一个启发式错误地提前到达时,会导致原来没有延迟的数据变成了延迟数据。这是在右边的例子中发生的情况:在第一个窗口的所有输入数据到达之前,watermarks已经超过第一个窗口的末尾,从而导致了不正确的输出5,而不是14。严格的说,这个缺点是启发式watermarks引发的问题,它的启发式意味着它们有时是错误的。因此,如果你关心正确性,那么仅仅依赖watermarks来确定何时实现输出是不够的。

在流式计算101中,我针对无限数据源进行无序处理时,对不足的完整性概念做了一些强调。watermarks太快或太慢的缺点是这些论点的基础。获得低延迟和正确性的同时,就无法同时获取完整性。这时,触发器是解决这些缺点的方案。

When:触发器的奇妙之处在于触发器是个奇妙的东西

触发器是问题第二部分的答案:“在处理时间维度上,何时计算窗口的结果?”触发器用来表示何时根据处理时间触发窗口的计算结果(尽管触发器本身可能会根据其他时间触发,如在事件时间的维度上使用watermarks)。窗口的每个特定的输出都称为窗口的窗格。

用于触发的信号示例包括:

  • watermarks进度,即事件时间进度,它隐藏在图6当中。当watermarks通过窗口的末尾时,输出就会实现。另一个例子是当窗口的生存期快结束时触发垃圾回收,我们将在稍后看到。
  • 处理时间进度,这对于提供定期的更新很有用,因为处理时间(与事件时间不一样)总是一致地、无延迟地进行更新。
  • 元素计数,这对于在窗口中观察到一定数量的元素后触发是有用的。
  • 标点符号,或者是其他依赖于数据的触发器,其中记录的一些特征(如EOF元素或刷新事件)指示应当生成输出。

除了这些基于具体信号触发的简单触发器之外,还存在一些复合触发器,允许我们创建更复杂的触发逻辑。下面是复合触发器的示例:

  • 重复触发器,当结合处理时间触发器的时候特别有效,可以提供定期的更新。
  • 逻辑和触发器,只有在所有子触发器都满足触发条件的时候逻辑和触发器才会触发(如在watermarks通过窗口之后,或者在我们观察到终止符号之后)。
  • 逻辑或触发器,在子触发器的任意一个满足触发条件的时候逻辑或触发器就会触发(如在水印通过窗口之后,或我们观察到终止符号之后)。
  • 序列触发器,它以预定义的顺序触发一系列的子触发器。

为了让触发器的概念更加具体(并且能够给我们提供一些基础),让我们继续前进,将使用的隐式触发器显式化,将图6中的隐式触发器添加到清单2的代码中:

1
2
3
4
PCollection<KV<String, Integer>> scores = input
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))
.triggering(AtWatermark()))
.apply(Sum.integersPerKey());
列表3.明确设定默认触发器

考虑到这一点以及对触发因素的基本理解,我们便开始着手解决watermarks太慢或者太快的问题。在这两种情况下,我们基本上都希望在watermarks超过窗口之前能够计算窗口的结果,同时提供持续更新的机制(除了在watermarks超过窗口末端的时刻)。所以,我们需要某种重复的触发。那么问题就变成了:我们需要重复触发来做什么?

太慢的情况下(即早期推测的结果),我们可能需要假设任何给定的窗口都能够有稳定的输入数据,因为我们知道(根据定义,窗口处于早期的阶段),我们观察到的窗口输入到目前为止是不完整的。因此,当处理时间提前时(如每分钟一次的时候),周期性触发也许是明智的选择,因为触发的次数不取决于实际观察到的窗口数据量;就算在最坏的情况下,我们只会得到稳定的周期性触发。

太快的情况下(即根据启发式watermarks的延迟数据提供更新结果),这样我们就假设我们的watermarks是基于相对准确的启发式(通常是相当安全的假设)。这种情况下,我们不希望经常看到延迟很久的数据,但是当我们看到的时候需要尽快修改我们的结果。每收到1个延迟数据就触发一次,能够让我们实现快速更新(即任何时候我们看到的延迟数据),但不太可能会给系统带来大的冲击。

请注意,这些只是示例:如果适合的话,我们可以自由选择不同的触发器(或者选择其中一个,或者选择两个,或者什么都不选)。

最后,我们需要安排这些不同触发点的时间:提前、准时、延迟。我们能够使用序列触发器和一个特殊的OrFinally触发器,它安装子触发器,当子触发器触发时终止父触发器。

1
2
3
4
5
6
7
PCollection<KV<String, Integer>> scores = input
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))
.triggering(Sequence(
Repeat(AtPeriod(Duration.standardMinutes(1)))
.OrFinally(AtWatermark()),
Repeat(AtCount(1))))
.apply(Sum.integersPerKey());
清单4.手动设定提前或延迟触发

然而,这看起来太冗长了。虽然给出了提前、准时、延迟触发的常用模式,但是使用起来不太方便,因此我们在Dataflow中提供了一个定制的(但语义上相当的)API接口,以使指定这样的触发器更加简单明了:

1
2
3
4
5
6
7
PCollection<KV<String, Integer>> scores = input
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))
.triggering(
AtWatermark()
.withEarlyFirings(AtPeriod(Duration.standardMinutes(1)))
.withLateFirings(AtCount(1))))
.apply(Sum.integersPerKey());
清单5:使用提前/延迟API进行提前或延迟触发

在流引擎上执行清单4或清单5中示例的代码(与前面一样,使用理想watermarks和启发式watermarks),如下所示:

图7.使用提前和延迟触发在流处理上执行基于窗口的求和

相对于图6,这个版本有两个明显的改进:

  • 对于第二个窗口中“watermarks太慢”的情况,(12:02,12:04):我们现在每分钟提供一次定期的提前计算。在理想的watermarks示例下差异最为突出,其中首次的输出从近7分钟降到了3.5分钟;而且在启发式watermarks的情况下也得到了明显的改进。这两个版本现在都可以随着时间的推移而稳定地进行计算(能够得到值为7、14、22的窗格),降低了数据输入到得到计算结果之间的延迟。
  • 对于第一个窗口中“启发式watermarks太快”的情况,(12:00,12:02):当值9出现的很晚的时候,我们立即将其合并到一个值为14的修正窗格中。

此时最大的差异是窗口的生命周期。在理想的watermarks案例中,一旦watermarks的结束,我们将再也看不到窗口的任何数据,窗口中的数据不会被处理,可以被安全的回收。在启发式watermarks的情况下,我们仍然需要保留窗口一段时间来处理延迟数据。但是到目前为止,我们的系统仍然没有好的方法知道每个窗口需要保留多长时间。这就是延迟的原因。

When:允许延迟(垃圾收集)

在继续最后一个问题(“如何修正结果?”)之前,我们先来谈谈长期存在的、无序的流处理系统中的一种实际面对的问题:垃圾收集。在图7中的启发式watermarks中,每个窗口都会在示例的整个生命周期内存在;这是必要的,能够使我们在数据到达的时候适当的延迟处理数据。但是,虽然能够保留所有的持续状态一直到数据处理完毕是很好的,但是实际上,在处理无限数据源的时候,这样为给定的窗口无限期地保持状态(包括元数据)是不切实际的;我们最终会将磁盘、内存空间会耗尽。

因此,任何真实、无序的处理系统都必须提供某种方法来设定窗口的生命周期,从而限制正在处理的窗口的生命周期。要做到这一点,一种简洁的方法是在系统内允许的最大延迟时间上定义一个边界,即给定数据的最晚到达时间(相对于watermarks时间)不能超过这个时间;任何在这之后到达的数据都会被丢弃。定义了最大的允许延迟之后,还需要确定窗口的保留时间:直到watermarks超过了窗口的末尾的延迟范围为止。此外,还可以允许系统在观察到任何数据后直接丢弃它们,这就意味着系统不会浪费计算资源。

由于允许延迟和watermarks之间的相互作用有点难以理解,所以我们来看一个例子。我们从清单5/图7中添加一个1分钟的延迟时间范围(请注意,这个特定的范围是严格选择的,因为它很适合图表;对于现实世界中的例子,也许更大的范围比较实用),如清单6所示:

1
2
3
4
5
6
7
8
PCollection<KV<String, Integer>> scores = input
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))
.triggering(
AtWatermark()
.withEarlyFirings(AtPeriod(Duration.standardMinutes(1)))
.withLateFirings(AtCount(1)))
.withAllowedLateness(Duration.standardMinutes(1)))
.apply(Sum.integersPerKey());
清单6. 带有最大允许延迟的提前和延迟触发

这个管道的执行类似于下面的图8,我添加了以下特性来突出显示允许最大延迟的影响:

  • 使用粗白线来表示当前的处理时间,表示所有活动窗口的延迟时间范围(在事件发生时)。
  • 一旦watermarks超过了窗口的最大延迟线,该窗口就会关闭,这意味着窗口的所有数据都会被丢弃。我留下一个虚线的矩形,表示窗口关闭时所覆盖的时间范围(在处理时间和事件时间的两个维度),并向右延伸了一个小尾巴,以表示窗口的延迟水平线(用于与watermarks进行对比)。
  • 对于这个图表,我为第一个窗口添加了一个额外的延迟数据6。6是延迟的,但是仍然在允许的延迟范围内,因此6被累加修正计算结果为11。然而,9超过了最大的允许延迟,因此直接被丢弃掉。

图8:在窗口使用提前、延迟触发,且设置了最大允许延迟时间的情况下,在流处理引擎上切分窗口并计算。

关于最大延迟线最后的两个说明:

  • 要非常清楚的是,如果您碰巧使用的是可获得理想watermarks的源数据,那么就不需要处理延迟的数据,而0秒的最大延迟时间是最佳的。这就是我们在图7的理想watermarks部分中看到的。
  • 有一个值得注意的例外情况,即使在使用启发式watermarks时,也需要指定延迟范围,这类似于为有限数目的密钥计算全局聚合(例如,在数据覆盖的时间范围内,按照web浏览器分组,统计网站访问的总次数)。在这种情况下,系统中活动窗口的数量受到使用的键数量的限制。只要键的数量保持在可管理的低水平,就不用通过设置最大允许的延迟时间来限制窗口的生命周期。

接下来让我们讨论第四个问题,这也是最后一个问题。

How:累积类型

当触发器用于单个窗口生成多个窗格时,我们发现自己会遇到最后一个问题:“如何随着时间修正结果?”在我们已经看到的示例中,每个窗格都建立在前面的一个窗格的基础之上。然而,实际上有三种不同的累积方式:

  • 丢弃:每当窗格计算完毕的时候,任何存储的窗口状态都会被丢弃。这意味着每个窗格都独立于以前出现的任何窗格。当下游的消费者本身正在执行某种累积时,丢弃模式是有用的。例如当将整数发送到一个期望自己计算总和的系统时,丢弃模式是有用的,下游将这些数据累加在一起形成最终结果。
  • 累加:如图7所示,每当窗格计算完毕时,任何存储状态都会被保留,并且将来的输入将累积并更新到现有状态中。这意味着每个连续的窗格都是建立在之前的窗格之上的。当以后的结果能够简单地覆盖之前的结果的时候,累加模式十分有用,例如在诸如BigTable或HBase这样的键/值中存储输出的时候。
  • 累加和撤销:它与累加类似,但是在生成新窗格的时候,会为前一个窗格生成一个独立的撤销。撤销(与新的累加结合)本质上是一种明确的表达方式,即“我之前告诉过你计算结果是X,但是我错了。撤销我告诉你的X,然后用Y代替它。”在两种情况下,撤销非常有用:
  1. 当下游的消费者按照不同维度重新分组数据的时候,新的值完全有可能与前一个值不同, 因此最终可能会在不同的组中。在这种情况下,新值不能简单的覆盖旧值;相反,需要将旧值从旧组中删除,然后将新值添加到新组中。
  2. 当使用动态窗口(例如回话窗口,我们将在之后更详细地讲解)时,由于窗口的合并,新值可能会替换多个旧窗口。在这种情况下,很难仅从新窗口中判断哪些旧窗口正在被替换。对旧窗口进行明确的撤销会使得任务变得简单明了。

同时看这两种情况时,每个组的不同语义会更清晰一些。考虑到图7中第二个窗口的三个窗格(事件时间范围是(12:02,12:04)),下表显示了在三种支持的累加模式中每个窗格的值是什么(累加模式是图7中使用的特定模式):

表1.在对比了几种累加模式之后,使用图7中的第2个窗口
  • 丢弃:每个窗格都仅仅包含特定的窗格中到达的值。因此,最终窗格中观察到的并不是总和,而是最后一个窗口的值。但是如果你要总结单独的窗格,会得出正确的结果22。这就是为什么在丢弃模式下,下游消费者在窗格上执行聚合时很有效。
  • 累加:如图7所示,每个窗格都包含了特定窗格中到达的值,然后加上先前窗格中的所有值。因此,观察到的最终值正确地计算了总和22。但是如果你要总结单独的窗格,就可以分别对窗格2和1 的输入分别进行双重和三重计数,然后得到总和是不正确的51。这就是为什么当你简单地用新值覆盖以前的值的时候,累加模式是最有效的:因为新值包含了迄今为止所看到的所有数据。
  • 累加和撤销:每个窗格都包含着一个新的累加值以及撤销了的前一个窗格的值。因此,观察到的最后(非撤销)值以及所有窗格(包括撤销)的总和都提供了正确的答案22。这就是撤销的强大之处。

若要查看丢弃模式的实际操作情况,我们在下面更改清单5:

1
2
3
4
5
6
7
8
PCollection<KV<String, Integer>> scores = input
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))
.triggering(
AtWatermark()
.withEarlyFirings(AtPeriod(Duration.standardMinutes(1)))
.withLateFirings(AtCount(1)))
.discardingFiredPanes())
.apply(Sum.integersPerKey());
清单7.丢弃模式与提前和延迟触发

在使用启发式watermarks的流式计算引擎上再次运行会产生如下输出:

图9.流式引擎上的提前/延迟触发的丢弃模式

尽管输出的总体形状与图7中的累积模式差不多,但是丢弃模式中的任何窗格都没有重叠。即每个输出都是独立的。

如果我们想看看它是如何在行动中撤销的。

1
2
3
4
5
6
7
8
PCollection<KV<String, Integer>> scores = input
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))
.triggering(
AtWatermark()
.withEarlyFirings(AtPeriod(Duration.standardMinutes(1)))
.withLateFirings(AtCount(1)))
.accumulatingAndRetractingFiredPanes())
.apply(Sum.integersPerKey());
清单8.使用提前/延迟触发的累加和撤销模式

若运行在一个流式引擎上会产生如下输出:

图10.流式引擎上的提前/延迟触发的累加和撤销模式

由于每个窗口的窗格都是重叠的,所以看起来还是乱糟糟的。撤销显示为红色,与重叠的蓝色窗格结合起来产生偏紫的颜色。我在给定的窗格中稍微移动两个输出的值(使用逗号分隔),使它们更容易区分。

将图7(使用累加)和图9(使用丢弃)和图10(使用累加和撤销)放在一起(均为启发式watermarks),下面展示了一个很好的视觉对比。

图11.对比:丢弃模式、累加模式、累加和撤回模式

正如你想象的那样,在存储和计算成本方面,这三种模式从左至右依次递增。为此,累加模式在正确性、延迟和成本之间进行了很好的权衡。

小结

目前我们已经讨论了四个问题:

  • What 计算的结果是什么?我们通过转换回答。
  • Where 在事件时间的哪个位置计算结果?我们通过窗口回答。
  • When 在处理时间的哪个时刻触发计算结果?我们通过Watermarks和触发器回答。
  • How 怎么修正相关结果?我们通过累加模式来回答。

但是我们目前只了解一种窗口类型:基于事件时间的固定窗口。我们从流式计算101中提到了多种窗口,我们今天会讲述其中的两个:基于处理时间的固定窗口、基于事件时间的会话窗口。

When/Where:基于处理时间的窗口

处理时间窗口之所以重要,有两个原因:

  • 对于某些使用情况,如使用情况监控(例如web服务流量QPS),你希望在观察数据的同时分析输入数据流,这种情况下处理时间窗口是合适的方法。
  • 对于事件发生的时间很重要的用例(例如分析用户的行为趋势、计费、评分等),处理时间窗口是错误的,因此我们要区分哪些场景合适。

因此,深入理解处理时间窗口和事件时间窗口之间的差异是值得的,特别是考虑到当今大多数流处理系统都使用了处理时间窗口的情况下。

无论在哪种模型中工作,比如本文介绍的模型,作为第一类概念的窗口是严格地基于事件时间的,可以使用两种方法来实现处理时间窗口:

  • 触发器:忽略事件时间(即使用跨越所有事件时间的全局窗口)并且使用触发器在处理时间中提供的窗口快照。
  • 入口时间:数据到达系统的时间称为入口时间,使用正常的事件时间窗口。实际上,Spark流处理就是这样做的。

请注意,这两种方法在某种程度上有些等价,但是在多级流水线方面有些不同:

在触发器版本中,每个阶段都可以使用处理时间切分窗口,阶段之间是相互独立的。例如窗口X中的数据可能会在窗口X+1或窗口X-1中。

在入口时间版本中,一旦数据被并入窗口X,那么它在整个流水线处理过程中都会一直属于窗口X,这是由于不同的处理阶段使用watermarks同步处理(Dataflow的做法)。对于微批处理(Spark流处理的做法)或其他协调因素在引擎级别协调处理。

正如我一直强调的,处理时间窗口的一大缺点是,当输入的顺序改变时,窗口的内容也会随之改变。为了更加具体地说明这一点,我们将会学习这三种用例:

  • 事件时间窗口
  • 使用触发器的处理时间窗口
  • 使用入口时间的处理时间窗口

我们将每个窗口都应用到两个不同的输入数据集中(共有6种情况)。两个输入数据集会用于完全相同的事件(即发生在相同事件时间的相同的值),但是有不同的顺序。第一组数据集与之前的顺序一致,颜色设为白色;第二组数据集在处理时间上调整了事件的顺序,如图12所示,颜色设为紫色。如果底层复杂的分布式系统已经以不同的顺序播放了东西,那么紫色的例子就是现实中可能发生的另一种方式。

图12.保持值和事件时间不变,在处理时间上移动事件顺序

事件时间窗口

为了建立基线,我们首先比较这两种观察顺序上基于事件时间的启发式watermarks。我们将重用清单5/图7中提前/延迟处理的代码,来获取下面的结果。左边是我们之前看到的,右边是第二个数据集的结果。需要注意的是:尽管输出的整体形状不同(因为处理时间的不同),但是四个窗口的最终结果没有变化:14,22,3,12:

图13.相同输入的两个不同处理时间顺序的事件时间窗口

使用触发器的处理时间窗口

现在我们来比较一下上面介绍的两种处理时间方法。首先,我们来看看触发器方法。使用这种方式需要考虑以下三个方面:

  • 窗口:我们使用全局事件时间窗口,因为本质上我们是以事件时间窗格模拟处理时间窗口。
  • 触发:我们根据期望的处理时间窗口大小,在处理时间的维度上实现周期性触发。
  • 累积类型:我们使用丢弃模式来保持窗格之间彼此独立,从而让每个窗格都类似于一个独立的处理时间窗口。

相应的代码在清单9中;请注意,全局窗口是默认窗口,因此没有具体的窗口覆盖策略:

1
2
3
4
5
PCollection<KV<String, Integer>> scores = input
.apply(Window.triggering(
Repeatedly(AtPeriod(Duration.standardMinutes(2))))
.discardingFiredPanes())
.apply(Sum.integersPerKey());
清单9,在全局事件窗口中使用重用触发器、丢弃模式来模拟处理时间窗口

当流处理引擎运行这两种不同顺序的输入数据集时,结果如图14所示。以下为有趣的笔记:

  • 由于我们使用事件时间窗格来模拟处理时间窗口,所以在处理时间的维度上描绘了窗口,这意味着窗口的宽度是在Y轴上测量的,而不是X轴。
  • 由于处理时间窗口对于输入数据的顺序十分敏感,因此即使事件发生的时间相同,每个“窗口”的结果都是不同的。左边我们可以得到12,21,18,而右边我们得到7,36,4。

图14.使用触发器模拟处理时间“窗口”,处理两个内容相同但顺序不同的数据集

使用入口时间的处理时间窗口

最后,我们来看看处理时间窗口,它是由输入数据的事件时间映射为入口时间所获得的。关于代码,这里由四个值得一提的方面:

  • 时移:当数据到达的时候,它们的事件时间被入口时间覆盖。请注意,我们目前在Dataflow中没有标准的API,尽管在将来我们可能有(因此接下来我们可能会使用伪代码I/O源中的虚构方法来表示此代码)。对于Google Cloud Pub/Sub,只需要在发布消息的时候将消息的timestampLabel留空即可;对于其他的来源,你需要查询相关的源代码文档。
  • 窗口:返回使用标准的固定事件时间窗口。
  • 触发:入口时间提供了计算理想watermarks的能力,因此可以使用默认触发器。这种情况下,当watermarks通过窗口末尾的时候,触发器会隐式触发一次。
  • 累加模式:因为我们的窗口只有一个输出,所以累加模式无关紧要。

实际的代码看起来可能是这样的:

1
2
3
4
5
PCollection<String> raw = IO.read().withIngressTimeAsTimestamp();
PCollection<KV<String, Integer>> input = raw.apply(ParDo.of(new ParseFn());
PCollection<KV<String, Integer>> scores = input
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(2))))
.apply(Sum.integersPerKey());
清单10.显式的默认触发器

在流式引擎的执行如图15所示。当数据到达的时候,为了匹配它们的入口时间(到达时的处理时间),于是更新它们的事件时间,导致在理想watermarks线上向右水平移动。关于此图有趣的笔记如下:

  • 与其他处理时的窗口示例一样,当输入顺序变化时,即使输入的值和事件时间不变,我们也会得到不一样的结果。
  • 与其他示例不同的是,窗口在事件时间的维度上(X轴)被重新划分了。尽管如此,他们并不是真正的事件时间窗口;我们简单地将处理时间映射到事件时间上,用新的记录代替输入的原始记录,而事件的时间是pipeline第一次收到数据的时间。
  • 尽管如此,由于使用了watermarks,在之前处理时间示例触发的相同时间,触发器仍然会触发。而且,产生的输出值仍与图14中的示例相同,正如预测的那样:左边是12,21,18,右边是7,36,4。
  • 当使用入口时间的时候,理想watermarks是可行的,因此实际的watermarks与理想的watermarks相匹配,以斜率1向右上方延伸。

图15.使用入口时间的处理时间窗口,处理两个内容相同但顺序不同的数据集

虽然不同的方法实现处理时间窗口很有趣,但是最重要的是我们从上一篇文章就一直关注的东西:事件时间窗口与顺序无关,至少在极限情况下是这样的(实际上在输入完成之前,处理过程中的窗格可能会有所不同);但处理时间窗口不是这样的。如果你关心事件实际发生的时间,必须使用事件时间窗口,否则你的计算结果是毫无意义的。

Where:会话窗口

现在我们非常非常非常接近完成这些示例。如果你已经走到了这一步,那么你是一位非常耐心的读者。好消息是你的耐心并不值得。现在我们将要看看我一个最喜欢的特性:动态的、由数据驱动的窗口,称为会话窗口。别紧张,让我们继续吧。

会话是一种特殊类型的窗口,它捕获数据中的一段活动,如果一段时间不活动,那么窗口会终止。这在数据分析中是有用的,因为它们提供某个用户在特定时间范围内活动。这样能够让活动的相关性更易分析,根据会话的长度来判断用户的参与水平。

从窗口的视角来看,会话窗口在两个方面是很有趣的:

  • 它们是数据驱动窗口的一个示例:窗口的位置和大小由输入数据本身来决定,而不是根据像固定窗口或滑动窗口这样预先定义的模式。
  • 他们也是未对齐窗口的示例,即窗口针对数据的特定子集(如每个用户)进行切分,而不是整个数据集,这与固定窗口和滑动窗口等对齐的窗口形成了对比,因为这些窗口通常对整个数据进行切分。

对于一些用例,可以提前在一个会话中用公共标识符来标记数据(如一个在线且可以定时发出ping的视频播放器;对于每次观看都分配一个会话ID,所有的ping都可以添加这个会话ID)。这种情况下的会话更容易构建,因为它本质上是按键分组的一种形式。

然而在更一般的情况下(即实际会话本身之前并不知道),会话只能根据数据的位置单独构建。这在处理无序数据的时候会变得更为困难。

提供一般情况的会话支持的关键在于,根据定义,完整的会话窗口是一组较小的重叠窗口的组合,每个窗口都包含了单个的记录,序列中的每个记录都记录了与下一个记录之间的间隔,并且这些间隔不允许超过预先的定义。因此,即使会话中的数据是无序的,我们仍然可以简单地合并各个数据的重叠窗口来构建最终会话。

图16.未合并的原始会话窗口和合并的会话窗口

让我们看另外一个示例,使用清单8中启用了撤销的提前/延迟代码,修改为会话窗口:

1
2
3
4
5
6
7
8
PCollection<KV<String, Integer>> scores = input
.apply(Window.into(Sessions.withGapDuration(Duration.standardMinutes(1)))
.triggering(
AtWatermark()
.withEarlyFirings(AtPeriod(Duration.standardMinutes(1)))
.withLateFirings(AtCount(1)))
.accumulatingAndRetractingFiredPanes())
.apply(Sum.integersPerKey());
清单11.基于会话窗口,提前和延迟触发,使用撤销模式

在流引擎上运行,你会得到如图17所示的结果:

图17.基于会话窗口,提前和延迟触发,使用撤销模式,在流引擎上执行

这里有很多事情要做,所以我会引导你看看一些东西:

  • 当遇到第一个值为5的记录的时候,它被放置到一个单一的原始会话窗口中,这个窗口从值为5这个记录的事件时间开始,窗口宽度为会话窗口的超时时长,如超时时长为1分钟,就那么会话窗口的宽度就是1分钟。我们在后面遇到的任何与该窗口重叠的窗口都应该属于这个会话的一部分,并且应该合并到这个窗口中。
  • 第二个到达的记录是7,因为它不与5的窗口重叠,所以它被放置在自己的原始会话窗口中。
  • 与此同时,watermarks已经通过第一个窗口的末尾。因此,在12:06之前,值5被看作是准时的结果。此后不久,当处理时间刚好是12:06的时候进行触发,第二个窗口也被看作是值为7的推测结果。
  • 接下来我们观察一系列记录,3,4和3,这三个会话窗口是相互重叠的。所以它们才会全部合并在一起,并且在12:07时提前触发,发出一个值为10的单个窗口。
  • 当8到达不久之后,它与具有值7和具有值10的窗口重叠。这三个窗口合并在一起,形成具有值25的新组合会话。当watermarks通过这个会话的末尾的时候,它实现了一个新值为25的新会话,并且收回了之前发出的两个窗口:7和10。
  • 当值9延迟到达时,将其与值5和值25一起放入一个更大的会话39中。值39和5、25的窗口都会由迟到的延迟触发器立即发出。

这些都是非常强大的工具。真正可怕的是,在窗口的帮助下,我们可以非常容易得把流处理的维度分解成不同的可组合的部分。最后,你可以将更多的注意力集中在业务逻辑上,而不是花很多时间去处理数据格式的问题。

如果你不相信,请查看这篇博客——介绍如何在Spark流处理中手动建立会话(注意,这不是为了指责他们,Spark的工作人员已经做了足够好的工作)。这是相当复杂的,他们甚至没有做适当得事件时间会话,或是提供随机或延迟的触发,也没有撤回。

本文的结尾

我们完成了许多示例,此处应有掌声!你现在已经充分沉浸在强大的流式处理当中,并且准备好去做一些令人惊叹的事情。但在离开之前,我希望快速回顾一下我们学习过的内容,以免你在匆忙之间忘记。首先,以下是我们学过的主要概念:

  • 事件时间与处理时间:最重要的区别是前者是事件的发生时间,后者是事件被你的数据处理系统观察到的时间。
  • 窗口:通常使用的方法是在时间边界进行切分来管理无限数据(通过处理时间或是事件时间,尽管我们将数据流模型中窗口的定义缩小为仅表示事件时间)。
  • watermarks:事件时间进度的概念,提供了一种在无序数据上运行无序处理系统的方法,它可以推理完整性。
  • 触发器:用于精确指定输出结果的机制,它对于特定的用例是有意义的。
  • 累加:单个窗口被多次触发计算,累加能够随着触发持续修正窗口结果。

其次,这四个问题用于建立我们的探索之旅(我承诺我不会再让你读这个了):

  • What 计算的结果是什么?= 转换
  • Where 在事件时间的哪个位置计算结果?= 窗口
  • When 在处理时间的哪个时刻触发计算结果?= watermarks+触发器
  • How 怎么修正相关结果?= 累加模式

最后,为了推动流处理带来的灵活性(因为最终,实际上我们就是这么做的:在处理数据的各个要素中取得平衡,如正确性、延迟和成本),回顾之前所说,只通过少量的代码修改,处理相同的数据集从而得到变化的输出:

图18.在同一个输入数据集上的九种输出变化

感谢你的耐心与兴趣,让我们下次再见!