网站设计远程培训,设计页面ui,常见的关键词,cc网站域名注册6.1 时间语义
6.1.1 Flink中的时间语义 对于一台机器而言#xff0c;时间就是系统时间。但是Flink是一个分布式处理系统#xff0c;多台机器“各自为政”#xff0c;没有统一的时钟#xff0c;各自有各自的系统时间。而对于并行的子任务来说#xff0c;在不同的节点…6.1 时间语义
6.1.1 Flink中的时间语义 对于一台机器而言时间就是系统时间。但是Flink是一个分布式处理系统多台机器“各自为政”没有统一的时钟各自有各自的系统时间。而对于并行的子任务来说在不同的节点系统时间就会有所差异。 我们知道一个集群有JobManager作为管理者是不是让它统一向所有 TaskManager 发送同步时钟信号就行了呢这也是不行的。因为网络传输会有延迟而且这延迟是不确定的所以 JobManager 发出的同步信号无法同时到达所有节点想要拥有一个全局统一的时钟在分布式系统里是做不到的。 另一个麻烦的问题是在流式处理的过程中数据是在不同的节点间不停流动的这同样也会有网络传输的延迟。例如上游任务在 8 点 59 分 59 秒发出一条数据到下游要做窗口计算时已经是 9 点01 秒了那这条数据到底该不该被收到 8 点~9 点的窗口呢 流式数据处理过程事件发生-生成数据-进入分布式消息队列-源算子读取-转换算子窗口算子做处理-输出算子输出 两个重要的时间点数据的产生时间——事件时间转换算子窗口算子处理的事件——处理时间 我们在定义窗口操作时到底以哪种时间作为衡量标准就是所谓的时间语义。
1、处理时间
执行处理操作的机器的系统时间
2、事件时间
事件在对应设备上发生的时间也就是数据生成的时间 举例用户在手机上点击某个按钮生成点击事件点击时手机上的时间是85959数据传送到某个节点进行计算处理时的时间为90001。 85959——事件时间 90001——处理时间 如果我们以事件时间为准则这条数据属于8——9点如果我们以处理时间为准则这条数据属于9——10点 在实际应用中由于分布式系统中网络传输延迟的不确定性数据达到的顺序往往是乱序的。例如我现在以事件时间为准进行统计每一个小时的点击量。现在有三个点击事件事件时间分别为 a——85000、b——85959、 c——90001但是到达时间分别为90200、090100、085800。可以看到c事件最先到达。那么当窗口接收到c事件时c事件的事件时间是90001这时窗口认为现在已经过了9点了。应该马上统计8——9点的点击量。但是a、b事件由于延迟在c事件到达之后才到达。导致没有被统计到8——9点的点击量中。 所以还不能简单的使用事件时间来当作时钟还需要用另外的标志来表示事件时间的进展。这个标志我们称之为“水位线”。 6.1.2 哪种语义更重要
两种语义都有各自适用的场景。通常来说处理时间是计算效率的衡量标准而事件时间更符合业务的计算逻辑。
处理时间一般用在实时性极高但结果准确性要求不高的的场景。事件时间语义是以一定延迟为代价换来了处理结果的正确性。 除了事件时间和处理时间Flink 还有一个“摄入时间”Ingestion Time的概念它是指数据进入 Flink 数据流的时间也就是 Source 算子读入数据的时间。摄入时间相当于是事件时间和处理时间的一个中和它是把 Source 任务的处理时间当作了数据的产生时间添加到数据里。这种时间语义可以保证比较好的正确性同时又不会引入太大的延迟。它的具体行为跟事件时间非常像可以当作特殊的事件时间来处理。 6.2 水位线
6.2.1 事件时间和窗口
前面已经讲过一个数据产生的时刻就是流处理中事件触发的时间点这就是“事件时间”。有时候我们不是来一个数据就处理输出而是要计算一段时间内的数。比如实时统计每个小时的点击量。例如8——9点的点击量需要等数据到齐了才能统计输出。那么这个8——9点就是一个窗口。而这里1个小时就是窗口的大小。
6.2.2 什么是水位线 只通过事件时间来判断是否一个窗口的数据已经到齐是不行的。我们可以基于事件时间去自定义一个时钟用来表示当前时间的进展。例如我们定义一个时钟这个时钟的时间逻辑是比事件时间晚5分钟。当一个数据过来它的事件时间是90000这时窗口会认为是85500。这时窗口认为还没有到9点所以8——9点的窗口统计还不到时间。会再等等等收到大于或等于90500的数据时才会进行统计。这样如果有事件时间为85800的数据在90400才到来时也能够被统计到8——9点的窗口中。因为90400的窗口时间是85900并没有到9点。 我们定义的这个时钟是用来衡量事件时间进展的是一个逻辑时钟。
但仅仅通过定义一个逻辑时钟还不够。还存在以下问题
当窗口聚合时要攒一批数据才会输出结果那么给下游的数据就会变少时间进度的控制就不够精细了数据向下游任务传递时一般只能传输给一个子任务除广播外这样其他的并行子任务的时钟就无法推进了不能进行窗口计算。
解决办法
在数据流中加入一个时钟标记记录当前的事件时间这个标记可以直接广播到下游当下游任务收到这个标记就可以更新自己的时钟了。由于类似于水流中用来做标志的记号在 Flink 中这种用来衡量事件时间Event Time进展的标记就被称作**“水位线”Watermark**。
1、有序流中的水位线 在理想状态下数据应该按照它们生成的先后顺序、排好队进入流中这样的话我们从每个数据中提取时间戳就可以保证总是从小到大增长的从而插入的水位线也会不断增长、事件时钟不断向前推进。 实际应用中如果当前数据量非常大可能会有很多数据的时间戳是相同的这时每来一条数据就提取时间戳、插入水位线就做了大量的无用功。而且即使时间戳不同同时涌来的数据时间差会非常小比如几毫秒往往对处理计算也没什么影响。所以为了提高效率一般会每隔一段时间生成一个水位线这个水位线的时间戳就是当前最新数据的时间戳。这里周期时间是指处理时间系统时间而不是事件时间 2、乱序流中的水位线 我们知道在分布式系统中数据在节点间传输会因为网络传输延迟的不确定性 导致顺序发生改变这就是所谓的“乱序数据”。这里所说的“乱序”out-of-order是指数据的先后顺序不一致主要就是基于数据的产生时间而言的。 最直观的想法自然是跟之前一样我们还是靠数据来驱动每来一个数据就提取它的时间戳、插入一个水位线。不过现在的情况是数据乱序所以有可能新的时间戳比之前的还小如果直接将这个时间的水位线再插入我们的“时钟”就回退了——水位线就代表了时钟时光不能倒流所以水位线的时间戳也不能减小。解决思路也很简单我们插入新的水位线时要先判断一下时间戳是否比之前的大否则就不再生成新的水位线。 如果考虑到大量数据同时到来的处理效率我们同样可以周期性地生成水位线。这时只需要保存一下之前所有数据中的最大时间戳需要插入水位线时就直接以它作为时间戳生成新的水位线。这样做尽管可以定义出一个事件时钟却也会带来一个非常大的问题我们无法正确处理“迟到”的数据。为了了让窗口能够正确收集到迟到的数据我们也可以等上 2 秒也就是用当前已有数据的最大时间戳减去 2 秒就是要插入的水位线的时间戳 如果仔细观察我们可以知道这种“等 2 秒”的策略其实并不能处理所有的乱序数据。因为有时候不知道最大延迟是多少。当你设置了等10秒但是这时有条数据晚了20秒就会被遗漏丢弃。所以这个时候我们需要去单独处理“迟到”的数据。后面会讲解这种情况的处理。 需要注意的地方 1.由于水位线是周期性生成的所以插入的位置不一定是在时间戳最大的数据后面。 2.这里一个窗口所收集的数据并不是之前所有已经到达的数据。因为数据属于哪个窗口是由数据本身的时间戳决定的一个窗口只会收集真正属于它的那些数据。 例如上图中尽管水位线 W(20)之前有时间戳为 22 的数据到来但是10~20 秒的窗口中也不会收集这个数据进行计算依然可以得到正确的结果。 3、水位线的特性 水位线是插入到数据流中的一个标记可以认为是一个特殊的数据 水位线主要的内容是一个时间戳用来表示当前事件时间的进展 水位线是基于数据的时间戳生成的 水位线的时间戳必须单调递增以确保任务的事件时间时钟一直向前推进 水位线可以通过设置延迟来保证正确处理乱序数据 一个水位线 Watermark(t)表示在当前流中事件时间已经达到了时间戳 t, 这代表 t 之前的所有数据都到齐了之后流中不会出现时间戳 t’ ≤ t 的数据
6.2.3 如何生成水位线
生成水位线其实就是定义我们的时钟的逻辑。
1、水位线生成的总体原则 我们知道完美的水位线是“绝对正确”的也就是一个水位线一旦出现就表示这个时间之前的数据已经全部到齐、之后再也不会出现了。而完美的东西总是可望不可及我们只能尽量去保证水位线的正确。如果对结果正确性要求很高、想要让窗口收集到所有数据我们该怎么做呢一个字等。由于网络传输的延迟不确定为了获取所有迟到数据我们只能等待更长的时间。作为筹划全局的程序员我们当然不会傻傻地一直等下去。那到底等多久呢如果等太久那很多迟到的数据基本不会遗漏但是程序输出延迟会增加。如果等待时间短那迟到的数据会被遗漏结果的准确性难以保证。 所以水位线生成的总体原则权衡低延迟和结果正确性
常用解决方案
a.需要对相关领域有一定的了解了根据业务来定夺。 b.可以单独创建一个 Flink 作业来监控事件流建立概率分布或者机器学习模型学习事件的迟到规律。得到分布规律之后就可以选择置信区间来确定延迟作为水位线的生成策略了。例如如果得到数据的迟到时间服从μ1σ1 的正态分布那么设置水位线延迟为 3 秒就可以保证至少 97.7%的数据可以正确处理。 c.对迟到数据单独处理
2、水位线生成策略 在Flink的DataStream API中有一个单独用于生成水位线的方法。为流中的数据分配时间戳并生成水位线。 public SingleOutputStreamOperatorT assignTimestampsAndWatermarks(WatermarkStrategyT watermarkStrategy)数据本身不是有时间戳吗为什么还要为数据分配时间戳呢 这是因为原始的时间戳只是写入日志数据的一个字段如果不提取出来并明确把它分配给数据 Flink 是无法知道数据真正产生的时间的。 参数
watermarkStrategy: 水位线策略 WatermarkStrategy继承了TimestampAssignerSupplier——时间分配器和WatermarkGeneratorSupplier——水位线生成器 public interface WatermarkStrategyT extends TimestampAssignerSupplierT, WatermarkGeneratorSupplierTTimestampAssigner主要负责从流中数据元素的某个字段中提取时间戳并分配给元素。时间戳的分配是生成水位线的基础。
WatermarkGenerator主要负责按照既定的方式 基于时间戳生成水位线。在WatermarkGenerator 接口中主要又有两个方法onEvent()和 onPeriodicEmit()
onEvent每个事件数据到来都会调用的方法它的参数有当前事件、时间戳 以及允许发出水位线的一个 WatermarkOutput可以基于事件做各种操作
onPeriodicEmitonPeriodicEmit周期性调用的方法可以由 WatermarkOutput 发出水位线。周期时间为处理时间可以调用环境配置的.setAutoWatermarkInterval()方法来设置默认为
200ms
3、Flink内置水位线生成器 atermarkStrategy 这个接口是一个生成水位线策略的抽象让我们可以灵活地实现自己的需求 Flink提供了内置的水位线生成器WatermarkGenerator不仅开箱即用简化了编程而且也为我们自定义水位线策略提供了模板 1有序流
对于有序流主要特点就是时间戳单调增长Monotonously Increasing Timestamps所以永远不会出现迟到数据的问题。这是周期性生成水位线的最简单的场景。简单来说就是直接拿当前最大的时间戳作为水位线就可以了
WatermarkStrategy.forMonotonousTimestamps()2乱序流
由于乱序流中需要等待迟到数据到齐所以必须设置一个固定量的延迟时间
WatermarkStrategy.forBoundedOutOfOrderness(Duration maxOutOfOrderness)这个方法需要传入一个 maxOutOfOrderness 参数表示“最大乱序程度”它表示数据流中乱序数据时间戳的最大差值
4、自定义水位线
两种
周期性水位线生成器周期性调用的方法onPeriodicEmit()中发出水位线断点式水位线生成器在事件触发的方法onEvent()中发出水位线
1周期性水位线生成器Periodic Generator
周期性生成器一般是通过 onEvent()观察判断输入的事件而在 onPeriodicEmit()里发出水位线。
public class CustomWatermarkStrategy implements WatermarkStrategyEvent {Overridepublic TimestampAssignerEvent createTimestampAssigner(TimestampAssignerSupplier.Context context) {SerializableTimestampAssignerEvent timestampAssigner new SerializableTimestampAssignerEvent(){Overridepublic long extractTimestamp(Event event, long recordTimestamp) {return event.getTimestamp();}};return timestampAssigner;}Overridepublic WatermarkGeneratorEvent createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {WatermarkGeneratorEvent watermarkGenerator new WatermarkGeneratorEvent(){// 延迟private Long delayTime 5000L;// 观察到的最大时间戳这里 delayTime 1L是为了防止溢出private Long maxTs Long.MIN_VALUE delayTime 1L;Overridepublic void onEvent(Event event, long l, WatermarkOutput watermarkOutput) {// 更新最大时间戳maxTs Math.max(event.getTimestamp(), maxTs);}Overridepublic void onPeriodicEmit(WatermarkOutput watermarkOutput) {// 发射水位线默认 200ms 调用一次watermarkOutput.emitWatermark(new Watermark(maxTs - delayTime - 1L));}};return watermarkGenerator;}
}2断点式水位生成器
断点式生成器会不停地检测 onEvent()中的事件当发现带有水位线信息的特殊事件时 就立即发出水位线。
public class CustomWatermarkStrategy implements WatermarkStrategyEvent {Overridepublic TimestampAssignerEvent createTimestampAssigner(TimestampAssignerSupplier.Context context) {SerializableTimestampAssignerEvent timestampAssigner new SerializableTimestampAssignerEvent(){Overridepublic long extractTimestamp(Event event, long recordTimestamp) {return event.getTimestamp();}};return timestampAssigner;}Overridepublic WatermarkGeneratorEvent createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {WatermarkGeneratorEvent watermarkGenerator new WatermarkGeneratorEvent(){Overridepublic void onEvent(Event event, long l, WatermarkOutput watermarkOutput) {// 只有在遇到特定的 itemId 时才发出水位线if (event.user.equals(Mary)) {watermarkOutput.emitWatermark(new Watermark(event.timestamp - 1));}}Overridepublic void onPeriodicEmit(WatermarkOutput watermarkOutput) {// 不需要做任何事情}};return watermarkGenerator;}
}5、在自定义数据源中发送水位线 我们也可以在自定义的数据源中抽取事件时间然后发送水位线。这里要注意的是在自定义数据源中发送了水位线以后就不能再在程序中使用assignTimestampsAndWatermarks 方法来生成水位线了。在自定义数据源中生成水位线和在程序中使用 assignTimestampsAndWatermarks 方法生成水位线二者只能取其一。 public class ClickSource extends RichParallelSourceFunctionEvent {// 声明一个布尔变量作为控制数据生成的标识位private Boolean running true;Random random new Random();Overridepublic void run(SourceContext ctx) throws Exception {// 在指定的数据集中随机选取数据String[] users {Mary, Alice, Bob, Cary};String[] urls {./home, ./cart, ./fav, ./prod?id1, ./prod?id2};while (Boolean.TRUE.equals(running)){Event event new Event(users[random.nextInt(users.length)],urls[random.nextInt(urls.length)],Calendar.getInstance().getTimeInMillis());ctx.collectWithTimestamp(event, event.getTimestamp());ctx.emitWatermark(new Watermark(event.timestamp - 1L));Thread.sleep(1000);}}Overridepublic void cancel() {running false;}Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);System.out.println(open getRuntimeContext().getIndexOfThisSubtask());}Overridepublic void close() throws Exception {super.close();System.out.println(close getRuntimeContext().getIndexOfThisSubtask());}
}6.2.4 水位线的传递 我们知道水位线是数据流中插入的一个标记用来表示事件时间的进展它会随着数据一起在任务间传递。如果只是直通式forward的传输那很简单数据和水位线都是按照本身的顺序依次传递、依次处理的一旦水位线到达了算子任务, 那么这个任务就会将它内部的时钟设为这个水位线的时间戳。 在这里“任务的时钟”其实仍然是各自为政的并没有统一的时钟。实际应用中往往上下游都有多个并行子任务为了统一推进事件时间的进展我们要求上游任务处理完水位线、时钟改变之后要把当前的水位线再次发出广播给所有的下游子任务。这样后续任务就不需要依赖原始数据中的时间戳经过转化处理后数据可能已经改变了也可以知道当前事件时间了 可是还有另外一个问题那就是在“重分区”redistributing的传输模式下一个任务有可能会收到来自不同分区上游子任务的数据。而不同分区的子任务时钟并不同步所以同一时刻发给下游任务的水位线可能并不相同。这时下游任务又该听谁的呢 这就要回到水位线定义的本质了它表示的是“当前时间之前的数据都已经到齐了”。这是一种保证告诉下游任务“只要你接到这个水位线就代表之后我不会再给你发更早的数据了你可以放心做统计计算而不会遗漏数据”。所以如果一个任务收到了来自上游并行任务的不同的水位线说明上游各个分区处理得有快有慢进度各不相同比如上游有两个并行子任务都发来了水位线一个是 5 秒一个是 7 秒这代表第一个并行任务已经处理完 5 秒之前的 所有数据而第二个并行任务处理到了 7 秒。那这时自己的时钟怎么确定呢当然也要以“这之前的数据全部到齐”为标准。如果我们以较大的水位线 7 秒作为当前时间那就表示“7 秒前的数据都已经处理完”这显然不是事实——第一个上游分区才处理到 5 秒5~7 秒的数据还会不停地发来而如果以最小的水位线 5 秒作为当前时钟就不会有这个问题了因为确实所 有上游分区都已经处理完不会再发 5 秒前的数据了。这让我们想到“木桶原理”所有的上游并行任务就像围成木桶的一块块木板它们中最短的那一块决定了我们桶中的水位。 我们可以用一个具体的例子将水位线在任务间传递的过程完整梳理一遍。如图 6-12 所示当前任务的上游有四个并行子任务所以会接收到来自四个分区的水位线而下游有三个并行子任务所以会向三个分区发出水位线。具体过程如下 1上游并行子任务发来不同的水位线当前任务会为每一个分区设置一个“分区水位线”Partition Watermark这是一个分区时钟而当前任务自己的时钟就是所有分区时钟里最小的那个。 2当有一个新的水位线第一分区的 4从上游传来时当前任务会首先更新对应的分区时钟然后再次判断所有分区时钟中的最小值如果比之前大说明事件时间有了进展当前任务的时钟也就可以更新了。这里要注意更新后的任务时钟并不一定是新来的那个分区水位线比如这里改变的是第一分区的时钟但最小的分区时钟是第三分区的 3于是当前任务时钟就推进到了 3。当时钟有进展时当前任务就会将自己的时钟以水位线的形式广播给下游所有子任务。 3再次收到新的水位线第二分区的 7后执行同样的处理流程。首先将第二个分区时钟更新为 7然后比较所有分区时钟发现最小值没有变化那么当前任务的时钟也不变也不会向下游任务发出水位线。 4同样道理当又一次收到新的水位线第三分区的 6之后第三个分区时钟更新为6同时所有分区时钟最小值变成了第一分区的 4所以当前任务的时钟推进到 4并发出时间戳为 4 的水位线广播到下游各个分区任务。水位线在上下游任务之间的传递非常巧妙地避免了分布式系统中没有统一时钟的问题 每个任务都以“处理完之前所有数据”为标准来确定自己的时钟就可以保证窗口处理的结果总是正确的。对于有多条流合并之后进行处理的场景水位线传递的规则是类似的。 备注 水位线的默认计算公式水位线 观察到的最大事件时间 – 最大延迟时间 – 1 毫秒 在数据流开始之前Flink 会插入一个大小是负无穷大在 Java 中是-Long.MAX_VALUE 的水位线而在数据流结束时Flink 会插入一个正无穷大(Long.MAX_VALUE)的水位线保证所有的窗口闭合以及所有的定时器都被触发。对于离线数据集Flink 也会将其作为流读入也就是一条数据一条数据的读取。在这种情况下Flink 对于离线数据集只会插入两次水位线也就是在最开始处插入负无穷大的水位线在结束位置插入一个正无穷大的水位线。因为只需要插入两次水位线就可以保证计算的正确无需在数据流的中间插入水位线了 6.3 窗口 我们已经了解了 Flink 中事件时间和水位线的概念那它们有什么具体应用呢当然是做基于时间的处理计算了。其中最常见的场景就是窗口聚合计算。 之前我们已经了解了 Flink 中基本的聚合操作。在流处理中我们往往需要面对的是连续不断、无休无止的无界流不可能等到所有所有数据都到齐了才开始处理。所以聚合计算其实只能针对当前已有的数据——之后再有数据到来就需要继续叠加、再次输出结果。这样似乎很“实时”但现实中大量数据一般会同时到来需要并行处理这样频繁地更新结果就会给系统带来很大负担了。 更加高效的做法是把无界流进行切分每一段数据分别进行聚合结果只输出一次。这就相当于将无界流的聚合转化为了有界数据集的聚合这就是所谓的“窗口”Window聚合操作。窗口聚合其实是对实时性和处理效率的一个权衡。在实际应用中我们往往更关心一段时间内数据的统计结果比如在过去的 1 分钟内有多少用户点击了网页。在这种情况下我们就可以定义一个窗口收集最近一分钟内的所有用户点击数据然后进行聚合统计最终输出一个结果就可以了。 6.3.1 窗口的概念 所以在 Flink 中窗口其实并不是一个“框”流进来的数据被框住了就只能进这一个窗口。相比之下我们应该把窗口理解成一个“桶”如图 6-15 所示。在 Flink 中窗口可以把流切割成有限大小的多个“存储桶”bucket)每个数据都会分发到对应的桶中当到达窗口结束时间时就对每个桶中收集的数据进行计算处理。 我们可以梳理一下事件时间语义下之前例子中窗口的处理过程 1第一个数据时间戳为 2判断之后创建第一个窗口[0, 10并将 2 秒数据保存进去 2后续数据依次到来时间戳均在 [0, 10范围内所以全部保存进第一个窗口 311 秒数据到来判断它不属于[0, 10窗口所以创建第二个窗口[10, 20并将 11秒的数据保存进去。由于水位线设置延迟时间为 2 秒所以现在的时钟是 9 秒第一个窗口也没有到关闭时间 4之后又有 9 秒数据到来同样进入[0, 10窗口中 512 秒数据到来判断属于[10, 20窗口保存进去。这时产生的水位线推进到了 10 秒所以 [0, 10窗口应该关闭了。第一个窗口收集到了所有的 7 个数据进行处理计算后输出结果并将窗口关闭销毁 6同样的之后的数据依次进入第二个窗口遇到 20 秒的数据时会创建第三个窗口[20, 30并将数据保存进去遇到 22 秒数据时水位线达到了 20 秒第二个窗口触发计算输出结果并关闭。 这里需要注意的是Flink 中窗口并不是静态准备好的而是动态创建——当有落在这个窗口区间范围的数据达到时才创建对应的窗口。另外这里我们认为到达窗口结束时间时 窗口就触发计算并关闭事实上“触发计算”和“窗口关闭”两个行为也可以分开 其实就是水位线只是告知不会再收到某个时间点后面的数据了。比如来了个水位线W(10)说明不会再收到小于10的数据了尽管已经收到了事件时间大于10的数据了比如上图收到12、11。但是统计的时候是按照事件时间去统计的。w(10)来的时候会把事件时间落在[010的数据进行统计。
6.3.2 窗口的分类
1、按照驱动类型分类 1时间窗口
就是按照时间段去截取数据 时间窗口类TimeWindow 2计数窗口
计数窗口基于元素的个数来截取数据 为什么不把窗口区间定义成左开右闭、包含上结束时间呢这样maxTimestamp 跟 end 一致不就可以省去一个方法的定义吗 答这主要是为了方便判断窗口什么时候关闭。对于事件时间语义窗口的关闭需要水位线推进到窗口的结束时间而我们知道水位线 Watermark(t)代表的含义是“时间戳小于等于 t 的数据都已到齐不会再来了”。为了简化分析我们先不考虑乱序流设置的延迟时间。那么当新到一个时间戳为 t 的数据时当前水位线的时间推进到了 t – 1还记得乱序流里生成水位线的减一操作吗。所以当时间戳为 end 的数据到来时水位线推进到了 end - 1如果我们把窗口定义为不包含 end那么当前的水位线刚好就是 maxTimestamp表示窗口能够包含的数据都已经到齐我们就可以直接关闭窗口了。所以有了这样的定义我们就不需要再去考虑那烦人的“减一”了直接看到时间戳为 end 的数据就关闭对应的窗口。如果为乱序流设置了水位线延迟时间 delay也只需要等到时间戳为 end delay 的数据就可以关窗了。 2、按照窗口分配数据的规则分类
1滚动窗口 按照固定大小可以是固定的时间间隔或者是固定的数据数量对数据进行划分窗口间没有重叠。
2滑动窗口 滑动窗口的大小也是固定的。区别在于窗口之间并不是首尾相接的 而是可以“错开”一定的位置。如果看作一个窗口的运动那么就像是向前小步“滑动”一样。同样可以基于时间和计数。
3会话窗口 会话窗口顾名思义是基于“会话”session来来对数据进行分组的。这里的会话类似 Web 应用中 session 的概念不过并不表示两端的通讯过程而是借用会话超时失效的机制来描述窗口。简单来说就是数据来了之后就开启一个会话窗口如果接下来还有数据陆续到来 那么就一直保持会话如果一段时间一直没收到数据那就认为会话超时失效窗口自动关闭。这就好像我们打电话一样如果时不时总能说点什么那说明还没聊完如果陷入了尴尬的沉默半天都没话说那自然就可以挂电话了。 与滑动窗口和滚动窗口不同会话窗口只能基于时间来定义而没有“会话计数窗口”的概念。这很好理解“会话”终止的标志就是“隔一段时间没有数据来”如果不依赖时间而改成个数就成了“隔几个数据没有数据来”这完全是自相矛盾的说法。 而同样是基于这个判断标准这“一段时间”到底是多少就很重要了必须明确指定。对于会话窗口而言最重要的参数就是这段时间的长度size它表示会话的超时时间也就是两个会话窗口之间的最小距离。如果相邻两个数据到来的时间间隔Gap小于指定的大小 size那说明还在保持会话它们就属于同一个窗口如果 gap 大于 size那么新来的数据就应该属于新的会话窗口而前一个窗口就应该关闭了。在具体实现上我们可以设置静态固定的大小size也可以通过一个自定义的提取器gap extractor动态提取最小间隔 gap 的值。 考虑到事件时间语义下的乱序流这里又会有一些麻烦。相邻两个数据的时间间隔 gap 大于指定的 size我们认为它们属于两个会话窗口前一个窗口就关闭可在数据乱序的情况下可能会有迟到数据它的时间戳刚好是在之前的两个数据之间的。这样一来之前我们判断的间隔中就不是“一直没有数据”而缩小后的间隔有可能会比 size 还要小——这代表三个数据本来应该属于同一个会话窗口。 所以在 Flink 底层对会话窗口的处理会比较特殊每来一个新的数据都会创建一个新的会话窗口然后判断已有窗口之间的距离如果小于给定的 size就对它们进行合并merge 操作。在 Window 算子中对会话窗口会有单独的处理逻辑。 我们可以看到与前两种窗口不同会话窗口的长度不固定起始和结束时间也是不确定的各个分区之间窗口没有任何关联。如图会话窗口之间一定是不会重叠的而且会留有至少为 size 的间隔session gap。
4全局窗口
这种窗口全局有效会把相同 key 的所有数据都分配到同一个窗口中说直白一点就跟没分窗口一样。无界流的数据永无止尽所以这种窗口也没有结束的时候默认是不会做触发计算的。如果希望它能对数据进行计算处理 还需要自定义“触发器”Trigger。
6.3.3 窗口API概览
1、按键分区和非按键分区
在定义窗口操作之前需要确定是基于按键分区还是非按键分区数据流上开窗。
1按键分区
stream.keyBy(...).window(...);2非按键分区
stream.windowAll(...)2、代码中窗口API的调用
窗口操作主要有两个部分窗口分配器Window Assigners和窗口函数Window Functions
stream.keyBy(key selector).window(window assigner).aggregate(window function).window()方法需要传入一个窗口分配器它指明了窗口的类型而后面的.aggregate() 方法传入一个窗口函数作为参数它用来定义窗口具体的处理逻辑。
**窗口分配器**指定用哪一种窗口时间 or 计数滑动、滚动、会话…
**窗口函数**对窗口的数据的计算逻辑
6.3.4 窗口分配器
定义窗口分配器Window Assigners是构建窗口算子的第一步它的作用就是定义数据应该被“分配”到哪个窗口。
1、时间窗口
1滚动处理时间窗口
stream.keyBy(...).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).aggregate(...)这里.of()方法需要传入一个 Time 类型的参数 size表示滚动窗口的大小我们这里创建了一个长度为 5 秒的滚动窗口。 另外.of()还有一个重载方法可以传入两个 Time 类型的参数size 和 offset。第一个参数当然还是窗口大小第二个参数则表示窗口起始点的偏移量。这里需要多做一些解释对于我们之前的定义滚动窗口其实只有一个 size 是不能唯一确定的。比如我们定义 1 天的滚动窗口从每天的 0 点开始计时是可以的统计的就是一个自然日的所有数据而如果从每天的 凌晨 2 点开始计时其实也完全没问题只不过统计的数据变成了每天 2 点到第二天 2 点。这个起始点的选取其实对窗口本身的类型没有影响而为了方便应用默认的起始点时间戳是窗口大小的整倍数。也就是说如果我们定义 1 天的窗口默认就从 0 点开始如果定义 1 小时的窗口默认就从整点开始。而如果我们非要不从这个默认值开始那就可以通过设置偏移量offset 来调整。 这里读者可能会觉得奇怪这个功能好像没什么用非要弄个偏移量不是给自己找别扭吗这其实是有实际用途的。我们知道不同国家分布在不同的时区。标准时间戳其实就是 1970 年 1 月 1 日 0 时 0 分 0 秒 0 毫秒开始计算的一个毫秒数而这个时间是以 UTC 时间也就是 0 时区伦敦时间为标准的。我们所在的时区是东八区也就是 UTC8跟 UTC 有 8 小时的时差。我们定义 1 天滚动窗口时如果用默认的起始点那么得到就是伦敦时间每天 0 点开启窗口这时是北京时间早上 8 点。那怎样得到北京时间每天 0 点开启的滚动窗口呢只要设置-8 小时的偏移量就可以了 .window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))2滑动处理时间窗口
stream.keyBy(...).window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))).aggregate(...)3处理时间会话窗口
静态会话超时时间会话窗口
stream.keyBy(...).window(ProcessingTimeSessionWindows.withGap(Time.seconds(10))).aggregate(...)动态会话超时时间的会话窗口
.window(ProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractorTuple2String, Long() {Overridepublic long extract(Tuple2String, Long element) {// 提取 session gap 值返回, 单位毫秒return element.f0.length() * 1000;}}
))4滚动事件时间窗口
stream.keyBy(...).window(TumblingEventTimeWindows.of(Time.seconds(5))).aggregate(...)5滑动事件时间窗口
stream.keyBy(...).window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).aggregate(...)6事件时间会话窗口
stream.keyBy(...).window(EventTimeSessionWindows.withGap(Time.seconds(10))).aggregate(...)2、计数窗口
1滚动计数窗口
stream.keyBy(...).countWindow(10)2滑动计数窗口
stream.keyBy(...).countWindow(10)3、全局窗口
全局窗口是计数窗口的底层实现一般在需要自定义窗口时使用。需要注意使用全局窗口必须自行定义触发器才能实现窗口计算否则起不到任何作用。
stream.keyBy(...).window(GlobalWindows.create());6.3.5 窗口函数 定义了窗口分配器我们只是知道了数据属于哪个窗口可以将数据收集起来了至于收集起来到底要做什么其实还完全没有头绪。所以在窗口分配器之后必须再接上一个定义窗口如何进行计算的操作这就是所谓的“窗口函数”window functions。 经窗口分配器处理之后数据可以分配到对应的窗口中而数据流经过转换得到的数据类型是 WindowedStream。这个类型并不是 DataStream所以并不能直接进行其他转换而必须进一步调用窗口函数对收集到的数据进行处理计算之后才能最终再次得到 DataStream 窗口函数根据处理的方式可以分为两类增量聚合函数和全窗口函数
1、增量聚合函数 窗口将数据收集起来最基本的处理操作当然就是进行聚合。窗口对无限流的切分可以看作得到了一个有界数据集。如果我们等到所有数据都收集齐在窗口到了结束时间要输出结果的一瞬间再去进行聚合显然就不够高效了——这相当于真的在用批处理的思路来做实时流处理。 为了提高实时性我们可以再次将流处理的思路发扬光大就像 DataStream 的简单聚合一样每来一条数据就立即进行计算中间只要保持一个简单的聚合状态就可以了区别只是在于不立即输出结果而是要等到窗口结束时间。等到窗口到了结束时间需要输出计算结果的时候我们只需要拿出之前聚合的状态直接输出这无疑就大大提高了程序运行的效率和实时性。 典型的增量聚合函数有两个ReduceFunction 和 AggregateFunction。
1归约函数
就是将中间结果和新来的数据两两归约
窗口函数中也提供了 ReduceFunction只要基于 WindowedStream 调用.reduce()方法然后传入 ReduceFunction 作为参数就可以指定以归约两个元素的方式去对窗口中数据进行聚合了。 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorEvent stream env.addSource(new ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.EventforBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssignerEvent() {Overridepublic long extractTimestamp(Event event, long l) {return event.getTimestamp();}}));stream.map(new MapFunctionEvent, Tuple2String, Long() {Overridepublic Tuple2String, Long map(Event event) throws Exception {return Tuple2.of(event.url, 1L);}}).keyBy(r-r.f0).window(TumblingEventTimeWindows.of(Time.seconds(5))).reduce(new MyReduceFunction()).print();env.execute();}// 自定义归约函数
public class MyReduceFunction implements ReduceFunctionTuple2String, Long {Overridepublic Tuple2String, Long reduce(Tuple2String, Long value1, Tuple2String, Long value2) throws Exception {// 定义累加规则return Tuple2.of(value1.f0, value1.f1 value2.f1);}
}ReduceFunction 可以解决大多数归约聚合的问题但是这个接口有一个限制就是聚合状态的类型、输出结果的类型都必须和输入数据类型一样。
2聚合函数
在有些情况下还需要对状态进行进一步处理才能得到输出结果这时它们的类型可能不同使用 ReduceFunction 就会非常麻烦 例如如果我们希望计算一组数据的平均值应该怎样做聚合呢很明显这时我们需要计算两个状态量数据的总和sum以及数据的个数count而最终输出结果是两者的商 sum/count。如果用 ReduceFunction那么我们应该先把数据转换成二元组(sum, count)的形式然后进行归约聚合最后再将元组的两个元素相除转换得到最后的平均值。本来应该只是一个任务可我们却需要 map-reduce-map 三步操作这显然不够高效。 于是自然可以想到如果取消类型一致的限制让输入数据、中间状态、输出结果三者类型都可以不同不就可以一步直接搞定了吗 Flink 的 Window API 中的 aggregate 就提供了这样的操作。直接基于 WindowedStream 调用.aggregate() 方法 就可以定义更加灵活的窗口聚合操作。这个方法需要传入一个 AggregateFunction 的实现类作为参数。AggregateFunction 在源码中的定义如下:
public interface AggregateFunctionIN, ACC, OUT extends Function, Serializable
{ACC createAccumulator();ACC add(IN value, ACC accumulator);OUT getResult(ACC accumulator);ACC merge(ACC a, ACC b);
}AggregateFunction 可以看作是 ReduceFunction 的通用版本这里有三种类型输入类型 IN、累加器类型ACC和输出类型OUT。输入类型 IN 就是输入流中元素的数据类型累加器类型 ACC 则是我们进行聚合的中间状态类型而输出类型当然就是最终计算结果的类型了。 接口中有四个方法 createAccumulator()创建一个累加器这就是为聚合创建了一个初始状态每个聚合任务只会调用一次 add()将输入的元素添加到累加器中。这就是基于聚合状态对新来的数据进行进一步聚合的过程。方法传入两个参数当前新到的数据 value和当前的累加器accumulator返回一个新的累加器值也就是对聚合状态进行更新。每条数据到来之后都会调用这个方法 getResult()从累加器中提取聚合的输出结果。也就是说我们可以定义多个状态 然后再基于这些聚合的状态计算出一个结果进行输出。比如之前我们提到的计算平均值就可以把 sum 和 count 作为状态放入累加器而在调用这个方法时相除得到最终结果。这个方法只在窗口要输出结果时调用 merge()合并两个累加器并将合并后的状态作为一个累加器返回。这个方法只在需要合并窗口的场景下才会被调用最常见的合并窗口Merging Window的场景就是会话窗口Session Windows 所以可以看到AggregateFunction 的工作原理是首先调用 createAccumulator()为任务初始化一个状态(累加器)而后每来一个数据就调用一次 add()方法对数据进行聚合得到的结果保存在状态中等到了窗口需要输出时再调用 getResult()方法得到计算结果。很明显 与 ReduceFunction 相同AggregateFunction 也是增量式的聚合而由于输入、中间状态、输出的类型可以不同使得应用更加灵活方便。 下面来看一个具体例子。我们知道在电商网站中PV页面浏览量和 UV独立访客数是非常重要的两个流量指标。一般来说PV 统计的是所有的点击量而对用户 id 进行去重之后得到的就是 UV。所以有时我们会用 PV/UV 这个比值来表示“人均重复访问量”也就是平均每个用户会访问多少次页面这在一定程度上代表了用户的粘度 代码略 代码中我们创建了事件时间滑动窗口统计 10 秒钟的“人均 PV”每 2 秒统计一次。由于聚合的状态还需要做处理计算因此窗口聚合时使用了更加灵活的 AggregateFunction。为了统计 UV我们用一个 HashSet 保存所有出现过的用户 id实现自动去重而 PV 的统计则类似一个计数器每来一个数据加一就可以了。所以这里的状态定义为包含一个 HashSet 和一个 count 值的二元组Tuple2HashSet, Long每来一条数据就将 user 存入 HashSet同时 count 加 1。这里的 count 就是 PV而 HashSet 中元素的个数size就是 UV所以最终窗口的输出结果就是它们的比值。 这里没有涉及会话窗口所以 merge()方法可以不做任何操作。 另外Flink 也为窗口的聚合提供了一系列预定义的简单聚合方法 可以直接基于 WindowedStream 调用。主要包括.sum()/max()/maxBy()/min()/minBy()与 KeyedStream 的简单聚合非常相似。它们的底层其实都是通过 AggregateFunction 来实现的。 通过 ReduceFunction 和 AggregateFunction 我们可以发现增量聚合函数其实就是在用流处理的思路来处理有界数据集核心是保持一个聚合状态当数据到来时不停地更新状态。这就是 Flink 所谓的“有状态的流处理”通过这种方式可以极大地提高程序运行的效率所以在实际应用中最为常见。 2、全窗口函数 窗口操作中的另一大类就是全窗口函数。与增量聚合函数不同全窗口函数需要先收集窗口中的数据并在内部缓存起来等到窗口要输出结果的时候再取出数据进行计算。 很明显这就是典型的批处理思路了——先攒数据等一批都到齐了再正式启动处理流程。这样做毫无疑问是低效的因为窗口全部的计算任务都积压在了要输出结果的那一瞬间而在之前收集数据的漫长过程中却无所事事。这就好比平时不用功到考试之前通宵抱佛脚肯定不如把工夫花在日常积累上。 那为什么还需要有全窗口函数呢这是因为有些场景下我们要做的计算必须基于全部的数据才有效这时做增量聚合就没什么意义了另外输出的结果有可能要包含上下文中的一些信息比如窗口的起始时间这是增量聚合函数做不到的。所以我们还需要有更丰富的窗口计算方式这就可以用全窗口函数来实现。 在 Flink 中全窗口函数也有两种WindowFunction 和 ProcessWindowFunction。
1窗口函数
我们可以基于 WindowedStream 调用.apply()方法传入一个 WindowFunction 的实现类。
stream.keyBy(key selector).window(window assigner).apply(new MyWindowFunction());这个类中可以获取到包含窗口所有数据的可迭代集合 Iterable还可以拿到窗口Window本身的信息。
public interface WindowFunctionIN, OUT, KEY, W extends Window extends Function, Serializable {void apply(KEY key, W window, IterableIN input, CollectorOUT out) throws Exception;
}当窗口到达结束时间需要触发计算时就会调用这里的 apply 方法。我们可以从 input 集合中取出窗口收集的数据结合 key 和 window 信息通过收集器Collector输出结果。这里 Collector 的用法与 FlatMapFunction 中相同。 不过我们也看到了WindowFunction 能提供的上下文信息较少也没有更高级的功能。事实上它的作用可以被 ProcessWindowFunction 全覆盖所以之后可能会逐渐弃用。一般在实际应用直接使用 ProcessWindowFunction 就可以了。 2处理窗口函数 ProcessWindowFunction 是 Window API 中最底层的通用窗口函数接口。之所以说它“最底层”是因为除了可以拿到窗口中的所有数据之外ProcessWindowFunction 还可以获取到一个“上下文对象”Context。这个上下文对象非常强大不仅能够获取窗口信息还可以访问当前的时间和状态信息。这里的时间就包括了处理时间processing time和事件时间水位线event time watermark。这就使得 ProcessWindowFunction 更加灵活、功能更加丰富。事实上ProcessWindowFunction 是 Flink 底层 API——处理函数process function中的一员关于处理函数我们会在后续章节展开讲解。 当然这些好处是以牺牲性能和资源为代价的。作为一个全窗口函数ProcessWindowFunction 同样需要将所有数据缓存下来、等到窗口触发计算时才使用。它其实就是一个增强版的 WindowFunction。 具体使用跟 WindowFunction 非常类似我们可以基于 WindowedStream 调用.process()方法传入一个 ProcessWindowFunction 的实现类。下面是一个电商网站统计每小时 UV 的例子 代码略3、增量聚合和全窗口函数的结合使用 这样调用的处理机制是基于第一个参数增量聚合函数来处理窗口数据每来一个数据就做一次聚合等到窗口需要触发计算时则调用第二个参数全窗口函数的处理逻辑输出结果。需要注意的是这里的全窗口函数就不再缓存所有数据了而是直接将增量聚合函数的结果拿来当作了 Iterable 类型的输入。一般情况下这时的可迭代集合中就只有一个元素了。 窗口处理的主体还是增量聚合而引入全窗口函数又可以获取到更多的信息包装输出 // ReduceFunction 与 WindowFunction 结合
public R SingleOutputStreamOperatorR reduce(ReduceFunctionT reduceFunction, WindowFunctionT, R, K, W function)// ReduceFunction 与 ProcessWindowFunction 结合
public R SingleOutputStreamOperatorR reduce(ReduceFunctionT reduceFunction, ProcessWindowFunctionT, R, K, W function)// AggregateFunction 与 WindowFunction 结合
public ACC, V, R SingleOutputStreamOperatorR aggregate(AggregateFunctionT, ACC, V aggFunction, WindowFunctionV, R, K, W windowFunction)// AggregateFunction 与 ProcessWindowFunction 结合
public ACC, V, R SingleOutputStreamOperatorR aggregate(AggregateFunctionT, ACC, V aggFunction, ProcessWindowFunctionV, R, K, W windowFunction)
代码略6.3.6 测试水位线和窗口的使用
代码略6.3.7 其它API
1、触发器
用来控制窗口什么时候触发计算
2、移除器
主要用来定义移除某些数据的逻辑
3、允许延迟 可以为窗口算子设置一个“允许的最大延迟”Allowed Lateness。也就是说我们可以设定允许延迟一段时间在这段时间内窗口不会销毁继续到来的数据依然可以进入窗口中并触发计算。直到水位线推进到了 窗口结束时间 延迟时间才真正将窗口的内容清空正式关闭窗口。 4、将迟到的数据放到侧输出流 我们自然会想到即使可以设置窗口的延迟时间终归还是有限的后续的数据还是会被丢弃。如果不想丢弃任何一个数据又该怎么做呢 Flink 还提供了另外一种方式处理迟到数据。我们可以将未收入窗口的迟到数据放入“侧输出流”side output进行另外的处理。所谓的侧输出流相当于是数据流的一个“分支”这个流中单独放置那些错过了该上的车、本该被丢弃的数据。 6.3.8 窗口的生命周期
1、窗口的创建 窗口的类型和基本信息由窗口分配器window assigners指定但窗口不会预先创建好 而是由数据驱动创建。当第一个应该属于这个窗口的数据元素到达时就会创建对应的窗口。 2、窗口计算的触发 除了窗口分配器每个窗口还会有自己的窗口函数window functions和触发器trigger。窗口函数可以分为增量聚合函数和全窗口函数主要定义了窗口中计算的逻辑而触发器则是指定调用窗口函数的条件。 对于不同的窗口类型触发计算的条件也会不同。例如一个滚动事件时间窗口应该在水位线到达窗口结束时间的时候触发计算属于“定点发车”而一个计数窗口会在窗口中元素数量达到定义大小时触发计算属于“人满就发车”。所以 Flink 预定义的窗口类型都有对应内置的触发器。 对于事件时间窗口而言除去到达结束时间的“定点发车”还有另一种情形。当我们设置了允许延迟那么如果水位线超过了窗口结束时间、但还没有到达设定的最大延迟时间这期间内到达的迟到数据也会触发窗口计算。这类似于没有准时赶上班车的人又追上了车这时车要再次停靠、开门将新的数据整合统计进来。 3、窗口的销毁 一般情况下当时间达到了结束点就会直接触发计算输出结果、进而清除状态销毁窗口。这时窗口的销毁可以认为和触发计算是同一时刻。这里需要注意 Flink 中只对时间窗口TimeWindow有销毁机制由于计数窗口CountWindow是基于全局窗口GlobalWindw 实现的而全局窗口不会清除状态所以就不会被销毁。 在特殊的场景下窗口的销毁和触发计算会有所不同。事件时间语义下如果设置了允许延迟那么在水位线到达窗口结束时间时仍然不会销毁窗口窗口真正被完全删除的时间点 是窗口的结束时间加上用户指定的允许延迟时间。 4、窗口API调用总结 Window API 首先按照时候按键分区分成两类。keyBy 之后的 KeyedStream可以调用.window()方法声明按键分区窗口Keyed Windows而如果不做 keyByDataStream 也可以直接调用.windowAll()声明非按键分区窗口。之后的方法调用就完全一样了。 接下来首先是通过.window()/.windowAll()方法定义窗口分配器得到 WindowedStream 然 后 通 过 各 种 转 换 方 法 reduce/aggregate/apply/process 给 出 窗 口 函 数 (ReduceFunction/AggregateFunction/ProcessWindowFunction)定义窗口的具体计算处理逻辑 转换之后重新得到 DataStream。这两者必不可少是窗口算子WindowOperator最重要的组成部分。 此外在这两者之间还可以基于 WindowedStream 调用.trigger()自定义触发器、调用.evictor()定义移除器、调用.allowedLateness()指定允许延迟时间、调用.sideOutputLateData() 将迟到数据写入侧输出流这些都是可选的 API一般不需要实现。而如果定义了侧输出流 可以基于窗口聚合之后的 DataStream 调用.getSideOutput()获取侧输出流。 6.4 迟到的数据处理 对于乱序流水位线本身就可以设置一个延迟时间而做窗口计算时我们又可以设置窗口的允许延迟时间另外窗口还有将迟到数据输出到测输出流的用法。所有的这些方法它们之间有什么关系我们又该怎样合理利用呢 6.4.1 设置水位线延迟时间 水位线是事件时间的进展它是我们整个应用的全局逻辑时钟。水位线生成之后会随着数据在任务间流动从而给每个任务指明当前的事件时间。所以从这个意义上讲水位线是一个覆盖万物的存在它并不只针对事件时间窗口有效。 之前我们讲到触发器时曾提到过“定时器”时间窗口的操作底层就是靠定时器来控制触发的。既然是底层机制定时器自然就不可能是窗口的专利了事实上它是 Flink 底层 API——处理函数process function的重要部分。 所以水位线其实是所有事件时间定时器触发的判断标准。那么水位线的延迟当然也就是全局时钟的滞后相当于是上帝拨动了琴弦所有人的表都变慢了。 既然水位线这么重要那一般情况就不应该把它的延迟设置得太大否则流处理的实时性就会大大降低。因为水位线的延迟主要是用来对付分布式网络传输导致的数据乱序而网络传输的乱序程度一般并不会很大大多集中在几毫秒至几百毫秒。所以实际应用中我们往往会给水位线设置一个“能够处理大多数乱序数据的小延迟”视需求一般设在毫秒~秒级。 当我们设置了水位线延迟时间后所有定时器就都会按照延迟后的水位线来触发。如果一个数据所包含的时间戳小于当前的水位线那么它就是所谓的“迟到数据” 6.4.2 允许窗口处理迟到数据 水位线延迟设置的比较小那之后如果仍有数据迟到该怎么办对于窗口计算而言如果水位线已经到了窗口结束时间默认窗口就会关闭那么之后再来的数据就要被丢弃了。 自然想到Flink 的窗口也是可以设置延迟时间允许继续处理迟到数据的。 这种情况下由于大部分乱序数据已经被水位线的延迟等到了所以往往迟到的数据不会太多。这样我们会在水位线到达窗口结束时间时先快速地输出一个近似正确的计算结果 然后保持窗口继续等到延迟数据每来一条数据窗口就会再次计算并将更新后的结果输出。这样就可以逐步修正计算结果最终得到准确的统计值了。 类比班车的例子我们可以这样理解大多数人是在发车时刻前后到达的所以我们只要把表调慢稍微等一会儿绝大部分人就都上车了这个把表调慢的时间就是水位线的延迟 到点之后班车就准时出发了不过可能还有该来的人没赶上。于是我们就先慢慢往前开这段时间内如果迟到的人抓点紧还是可以追上的如果有人追上来了就停车开门让他上来 然后车继续向前开。当然我们的车不能一直慢慢开需要有一个时间限制这就是窗口的允许延迟时间。一旦超过了这个时间班车就不再停留开上高速疾驰而去了。 所以我们将水位线的延迟和窗口的允许延迟数据结合起来最后的效果就是先快速实时地输出一个近似的结果而后再不断调整最终得到正确的计算结果。回想流处理的发展过程 这不就是著名的 Lambda 架构吗原先需要两套独立的系统来同时保证实时性和结果的最终正确性如今 Flink 一套系统就全部搞定了 6.4.3 将迟到数据放到窗口侧输出流 即使我们有了前面的双重保证可窗口不能一直等下去最后总要真正关闭。窗口一旦关闭后续的数据就都要被丢弃了。那如果真的还有漏网之鱼又该怎么办呢 那就要用到最后一招了用窗口的侧输出流来收集关窗以后的迟到数据。这种方式是最后 “兜底”的方法只能保证数据不丢失因为窗口已经真正关闭所以是无法基于之前窗口的结果直接做更新的。我们只能将之前的窗口计算结果保存下来然后获取侧输出流中的迟到数据判断数据所属的窗口手动对结果进行合并更新。尽管有些烦琐实时性也不够强但能够保证最终结果一定是正确的。 如果还用赶班车来类比那就是车已经上高速开走了这班车是肯定赶不上了。不过我们还留下了行进路线和联系方式迟到的人如果想办法辗转到了目的地还是可以和大部队会合的。最终所有该到的人都会在目的地出现。 所以总结起来Flink 处理迟到数据对于结果的正确性有三重保障水位线的延迟窗口允许迟到数据以及将迟到数据放入窗口侧输出流。我们可以回忆一下之前 6.3.5 小节统计每个 url 浏览次数的代码 UrlViewCountExample稍作改进增加处理迟到数据的功能。 代码略