重庆市网站备案,联合年检在什么网站做,有电脑网站怎样建手机号码,ftp网站怎么看后台的代码一、窗口
窗口#xff08;Window#xff09;是处理无界流的关键所在。窗口将流分成有限大小的“桶”#xff0c;我们可以在其上应用算子计算。Flink可以使用window()和windowAll()定义一个窗口#xff0c;二者都需要传入一个窗口分配器WindowAssigner#xff0c;WindowAs…一、窗口
窗口Window是处理无界流的关键所在。窗口将流分成有限大小的“桶”我们可以在其上应用算子计算。Flink可以使用window()和windowAll()定义一个窗口二者都需要传入一个窗口分配器WindowAssignerWindowAssigner负责分配事件到相应的窗口。
window()作用于KeyedStream上即keyBy()之后这样可以多任务并行计算对窗口内的多组数据分别进行聚合windowAll()作用于非KeyedStream上通常指DataStream由于所有元素都必须通过相同的算子实例因此该操作本质上是非并行的仅在特殊情况下例如对齐的时间窗口才可以并行执行。假设要计算24小时内每个用户的订单平均消费额就需要使用window()定义窗口如果要计算24小时内的所有订单平均消费额则需要使用windowAll()定义窗口。
一个Flink窗口程序的大致骨架结构如下
对KeyedStream应用window()函数进行窗口计算
对非KeyedStream应用windowAll()函数进行窗口计算 上面方括号[…]中的命令是可选的。也就是说Flink 允许你自定义多样化的窗口操作来满足你的需求。
首先必须要在定义窗口前确定的是你的 stream 是 keyed 还是 non-keyed。 keyBy(…) 会将你的无界 stream 分割为逻辑上的 keyed stream。 如果 keyBy(…) 没有被调用你的 stream 就不是 keyed。
对于 keyed stream其中数据的任何属性都可以作为 key。 使用 keyed stream 允许你的窗口计算由多个 task 并行因为每个逻辑上的 keyed stream 都可以被单独处理。 属于同一个 key 的元素会被发送到同一个 task。
对于 non-keyed stream原始的 stream 不会被分割为多个逻辑上的 stream 所以所有的窗口计算会被同一个 task 完成也就是 parallelism 为 1。
二、窗口的分类
Flink的窗口可以分为滚动窗口、滑动窗口、会话窗口、全局窗口且每种窗口又可分别根据事件时间和处理时间进行创建。
2.1 滚动窗口Tumbling Windows 滚动窗口的 assigner 分发元素到指定大小的窗口。滚动窗口的大小是固定的且各自范围之间不重叠。 比如说如果你指定了滚动窗口的大小为 5 分钟那么每 5 分钟就会有一个窗口被计算且一个新的窗口被创建如下图所示。 滚动窗口使用时需要指定窗口大小参数下面的代码片段展示了如何使用滚动窗口 滚动窗口分配器还可以使用可选的偏移(Offset)参数该参数可用于更改窗口的对齐方式。例如在没有偏移的情况下时间窗口会做一个对齐那么1小时窗口的起止时间可以是[0:00:00.000~0:59:59.999。如果你想要一个以小时为单位的窗口流但是窗口需要从每个小时的第15分钟开始则可以使用偏移量代码如下
TumblingEventTimeWindows.of(Time.hours(1),Time.minutes(15))那么窗口的起止时间将变为[0:15:00.000~1:14:59.999这样你将得到起始时间在0:15:001:15:002:15:00的窗口。与此相反如果你生活在不使用UTC±00:00时间世界标准时间的地方例如中国使用UTC08:00中国的当地时间要设置偏移量为Time.hours(-8)。你想要一个一天大小的时间窗口并且窗口从当地时间的每一个00:00:00开始可以使用
TumblingEventTimeWindows.of(Time.days(1),Time.hours(-8))因为UTC08:00比UTC时间早8小时。
2.2 滑动窗口Sliding Windows 与滚动窗口类似滑动窗口的 assigner 分发元素到指定大小的窗口窗口大小通过 window size 参数设置。 滑动窗口需要一个额外的滑动距离window slide参数来控制生成新窗口的频率。 因此如果 slide 小于窗口大小滑动窗口可以允许窗口重叠。这种情况下一个元素可能会被分发到多个窗口。
比如说你设置了大小为 10 分钟滑动距离 5 分钟的窗口你会在每 5 分钟得到一个新的窗口 里面包含之前 10 分钟到达的数据如下图所示。 使用滑动窗口时需要设置窗口大小和滑动步长两个参数。滑动步长决定了Flink以多大的频率来创建新的窗口步长较小窗口的个数会很多。步长小于窗口的大小时相邻窗口会重叠一个事件会被分配到多个窗口步长大于窗口大小时有些事件可能会丢失。
下面的代码片段展示了如何使用滑动窗口 如上一个例子所示滚动窗口的 assigners 也可以传入可选的 offset 参数。这个参数可以用来对齐窗口。 比如说不设置 offset 时长度为一小时、滑动距离为 30 分钟的滑动窗口会与 linux 的 epoch 对齐。 你会得到如 1:00:00.000 - 1:59:59.999, 1:30:00.000 - 2:29:59.999 等。 如果你想改变对齐方式你可以设置一个 offset。 如果设置了 15 分钟的 offset你会得到 1:15:00.000 - 2:14:59.999、1:45:00.000 - 2:44:59.999 等。 一个重要的 offset 用例是根据 UTC-0 调整窗口的时差。比如说在中国你可能会设置 offset 为 Time.hours(-8)。
2.3 会话窗口Session Windows 会话窗口的 assigner 会把数据按活跃的会话分组。 与滚动窗口和滑动窗口不同会话窗口不会相互重叠且没有固定的开始或结束时间。 会话窗口在一段时间没有收到数据之后会关闭即在一段不活跃的间隔之后。 会话窗口的 assigner 可以设置固定的会话间隔session gap或 用 session gap extractor 函数来动态地定义多长时间算作不活跃。 当超出了不活跃的时间段当前的会话就会关闭并且将接下来的数据分发到新的会话窗口。 下面的代码展示了如何使用会话窗口。 创建动态间隔会话窗口需要实现SessionWindowTimeGapExtractor接口并实现其中的extract()方法可以在extract()方法中加入相应的业务逻辑来动态控制会话间隔。 2.4 全局窗口Global Windows 全局窗口的 assigner 将拥有相同 key 的所有数据分发到一个全局窗口。 这样的窗口模式仅在你指定了自定义的 trigger 时有用。 否则计算不会发生因为全局窗口没有天然的终点去触发其中积累的数据。 下面的代码片段展示了如何使用全局窗口
三、窗口函数Window Functions
事件被窗口分配器分配到窗口后接下来需要指定想要在每个窗口上执行的计算函数即窗口函数以便对窗口内的数据进行处理。Flink提供的窗口函数有ReduceFunction、AggregateFunction、ProcessWindowFunction。
ReduceFunction和AggregateFunction是增量计算函数都可以基于中间状态对窗口中的元素进行递增聚合。例如窗口每流入一个新元素新元素就会与中间数据进行合并生成新的中间数据再保存到窗口中。
ProcessWindowFunction是全量计算函数如果需要依赖窗口中的所有数据或需要获取窗口中的状态数据和窗口元数据窗口开始时间、窗口结束时间等就需要使用ProcessWindowFunction。例如对整个窗口数据排序取TopN使用ProcessWindowFunction就非常灵活。
3.1 ReduceFunction
ReduceFunction 指定两条输入数据如何合并起来产生一条输出数据输入和输出数据的类型必须相同。 Flink 使用 ReduceFunction 对窗口中的数据进行增量聚合。
ReduceFunction 可以像下面这样定义 上面的例子是对窗口内元组的第二个属性求和。
3.2 AggregateFunction AggregateFunction是聚合函数的基本接口也是ReduceFunction的通用版本。与ReduceFunction相同Flink将在窗口输入元素到达时对其进行增量聚合。
AggregateFunction的泛型具有3种类型输入类型(IN)、累加器类型(ACC)和输出类型(OUT)。输入类型指的是输入流中元素的类型。AggregateFunction具有一种将一个输入元素添加到累加器的方法还具有创建初始累加器将两个累加器合并为一个累加器以及从累加器提取输出OUT类型的方法。
AggregateFunction是一种灵活的聚合函数具有以下特点:
可以对输入值、中间聚合和结果使用不同的类型以支持广泛的聚合类型。支持分布式聚合不同的中间聚合可以合并在一起以允许预聚合最终聚合优化。
AggregateFunction的中间聚合正在进行的聚合状态称为累加器。将值添加到累加器并通过结束累加器状态获得最终的聚合。中间聚合的数据类型可能与最终的聚合结果类型不同例如求平均值时需要保存计数和总和作为中间聚合。合并中间聚合部分聚合意味着合并累加器。
AggregationFunction本身是无状态的。为了允许单个AggregationFunction实例维护多个聚合例如每个Key一个聚合AggregationFunction在新聚合启动时会创建一个新的累加器。此外聚合函数必须是可序列化的因为它们在分布式执行期间会在分布式进程之间发送。
AggregationFunction接口的Java定义源码如下 例如计算窗口中每组元素的第二个字段的平均值定义和使用AggregateFunction的代码片段如下 注 AggregateFunction的merge()方法用于会话窗口。当会话窗口彼此之间的实际间隔比已定义的间隔小时它们将合并在一起。为了可合并对会话窗口计算时也需要相应的触发器和窗口函数例如ReduceFunctionAggregateFunction或ProcessWindowFunction。当需要合并两个会话窗口时merge()方法会被调用通过该方法合并两个窗口的结果。对于滚动窗口和滑动窗口不会调用merge()方法。 3.3 .ProcessWindowFunction
ProcessWindowFunction 有能获取包含窗口内所有元素的 Iterable 以及用来获取时间和状态信息的 Context 对象比其他窗口函数更加灵活。 ProcessWindowFunction 的灵活性是以性能和资源消耗为代价的 因为窗口中的数据无法被增量聚合而需要在窗口触发前缓存所有数据。因此使用ProcessWindowFunction需要注意数据量不应太大否则会造成内存溢出。
抽象类ProcessWindowFunction的源码如下
PublicEvolving
public abstract class ProcessWindowFunctionIN, OUT, KEY, W extends Windowextends AbstractRichFunction {private static final long serialVersionUID 1L;/*** Evaluates the window and outputs none or several elements.** param key The key for which this window is evaluated.* param context The context in which the window is being evaluated.* param elements The elements in the window being evaluated.* param out A collector for emitting elements.* throws Exception The function may throw exceptions to fail the program and trigger recovery.*/public abstract void process(KEY key, Context context, IterableIN elements, CollectorOUT out) throws Exception;/*** Deletes any state in the {code Context} when the Window expires (the watermark passes its* {code maxTimestamp} {code allowedLateness}).** param context The context to which the window is being evaluated* throws Exception The function may throw exceptions to fail the program and trigger recovery.*/public void clear(Context context) throws Exception {}/** The context holding window metadata. */public abstract class Context implements java.io.Serializable {/** Returns the window that is being evaluated. */public abstract W window();/** Returns the current processing time. */public abstract long currentProcessingTime();/** Returns the current event-time watermark. */public abstract long currentWatermark();/*** State accessor for per-key and per-window state.** pbNOTE:/bIf you use per-window state you have to ensure that you clean it up by* implementing {link ProcessWindowFunction#clear(Context)}.*/public abstract KeyedStateStore windowState();/** State accessor for per-key global state. */public abstract KeyedStateStore globalState();/*** Emits a record to the side output identified by the {link OutputTag}.** param outputTag the {code OutputTag} that identifies the side output to emit to.* param value The record to emit.*/public abstract X void output(OutputTagX outputTag, X value);}
}key 参数由 keyBy() 中指定的 KeySelector 选出。 如果是给出 key 在 tuple 中的 index 或用属性名的字符串形式指定 key这个 key 的类型将总是 Tuple 并且你需要手动将它转换为正确大小的 tuple 才能提取 key。
ProcessWindowFunction 可以像下面这样定义 使用ProcessWindowFunction来处理简单的聚合例如计算元素数量是非常低效的。接下来讲解如何将ReduceFunction或AggregateFunction与ProcessWindowFunction结合起来以便实现增量聚合并通过ProcessWindowFunction获得额外的窗口信息等。
3.4 带增量聚合的ProcessWindowFunction
由于ProcessWindowFunction是全量计算函数如果既要获得窗口信息又要进行增量聚合则可以将ProcessWindowFunction与ReduceFunction或AggregateFunction结合使用。
ProcessWindowFunction可以与ReduceFunction或AggregateFunction组合在一起以便在元素到达窗口时增量地聚合。当窗口关闭时ProcessWindowFunction将提供聚合的结果。
3.4.1 结合ReduceFunction实现增量聚合 下例展示了如何将 ReduceFunction 与 ProcessWindowFunction 组合返回窗口中的最小元素和窗口的开始时间。 3.4.2 结合AggregateFunction实现增量聚合 下例展示了如何将 AggregateFunction 与 ProcessWindowFunction 组合计算平均值并与窗口对应的 key 一同输出。