当前位置: 首页 > news >正文

西安做网站朋朋有用免费模板网

西安做网站朋朋,有用免费模板网,制作京东一样的网站,青岛网络推广公司前言 Flink 数据流经过 keyBy 分组后#xff0c;下一步就是 WindowAssigner。 WindowAssigner 定义了 stream 中的元素如何被分发到各个窗口#xff0c;元素可以被分发到一个或多个窗口中#xff0c;Flink 内置了常用的窗口分配器#xff0c;包括#xff1a;tumbling wi…前言 Flink 数据流经过 keyBy 分组后下一步就是 WindowAssigner。 WindowAssigner 定义了 stream 中的元素如何被分发到各个窗口元素可以被分发到一个或多个窗口中Flink 内置了常用的窗口分配器包括tumbling windows、 sliding windows、 session windows 和 global windows。除了 global windows 其它分配器都是基于时间来分发数据的。 当然你也可以继承 WindowAssigner 抽象类实现自定义的窗口分配逻辑。 WindowAssigner 先看一下 WindowAssigner 抽象类的定义 PublicEvolving public abstract class WindowAssignerT, W extends Window implements Serializable {private static final long serialVersionUID 1L;public WindowAssigner() {}public abstract CollectionW assignWindows(T var1, long var2, WindowAssignerContext var4);public TriggerT, W getDefaultTrigger() {return this.getDefaultTrigger(new StreamExecutionEnvironment());}/** deprecated */Deprecatedpublic abstract TriggerT, W getDefaultTrigger(StreamExecutionEnvironment var1);public abstract TypeSerializerW getWindowSerializer(ExecutionConfig var1);public abstract boolean isEventTime();PublicEvolvingpublic abstract static class WindowAssignerContext {public WindowAssignerContext() {}public abstract long getCurrentProcessingTime();} }四个方法作用如下 assignWindows 将元素 element 分发到一个或多个窗口返回值是窗口集合getDefaultTrigger 返回默认的窗口触发器 TriggergetWindowSerializer 返回窗口序列化器窗口也要在算子间传输isEventTime 是否基于事件时间语义 Flink 内置的 WindowAssigner 实现类关系图如下 首先可以按照基于何种时间语义划分出三大类 基于事件时间语义基于处理时间语义不基于时间语义 -- GlobalWindows 在基于时间语义的大类下面又可以按照时间窗口算法划分为三个具体实现 滚动窗口分配算法 tumbling windows滑动窗口分配算法 sliding windows会话窗口分配算法 session windows 定义窗口Window 窗口对象被 Flink 统一封装为抽象类org.apache.flink.streaming.api.windowing.windows.WindowFlink 内置了两种实现分别是 TimeWindow 基于时间范围的窗口包含开始时间戳和结束时间戳GlobalWindow 全局窗口与时间无关的窗口 如果内置的这两种窗口无法满足你的需求你也可以自定义窗口。需要注意的是窗口本身是要在算子间传输的所以你在自定义窗口的同时还必须提供一个窗口序列化器以便于 Flink 可以将你的窗口对象序列化传输。 如下示例我们定义了一个基于数字范围的 NumberWindow可以将一个数字划分到对应的数字范围窗口内。 public class NumberWindow extends Window {private final int min;private final int max;public NumberWindow(int min, int max) {this.min min;this.max max;}public int getMin() {return min;}public int getMax() {return max;}Overridepublic boolean equals(Object o) {if (this o) return true;if (o null || getClass() ! o.getClass()) return false;NumberWindow that (NumberWindow) o;return min that.min max that.max;}Overridepublic int hashCode() {return Objects.hash(min, max);}Overridepublic long maxTimestamp() {return Long.MAX_VALUE;} }Window 实现还必须配套一个序列化器主要是实现 两个int变量到窗口对象的转换。 public static class Serializer extends TypeSerializerSingletonNumberWindow {Overridepublic boolean isImmutableType() {return true;}Overridepublic NumberWindow createInstance() {return new NumberWindow(0, 0);}Overridepublic NumberWindow copy(NumberWindow numberWindow) {return numberWindow;}Overridepublic NumberWindow copy(NumberWindow numberWindow, NumberWindow t1) {return numberWindow;}Overridepublic int getLength() {return 8;}Overridepublic void serialize(NumberWindow numberWindow, DataOutputView dataOutputView) throws IOException {dataOutputView.writeInt(numberWindow.getMin());dataOutputView.writeInt(numberWindow.getMax());}Overridepublic NumberWindow deserialize(DataInputView dataInputView) throws IOException {return new NumberWindow(dataInputView.readInt(), dataInputView.readInt());}Overridepublic NumberWindow deserialize(NumberWindow numberWindow, DataInputView dataInputView) throws IOException {return this.deserialize(dataInputView);}Overridepublic void copy(DataInputView dataInputView, DataOutputView dataOutputView) throws IOException {dataOutputView.writeInt(dataInputView.readInt());dataOutputView.writeInt(dataInputView.readInt());}Overridepublic TypeSerializerSnapshotNumberWindow snapshotConfiguration() {return new TimeWindowSerializerSnapshot();}public static final class TimeWindowSerializerSnapshot extends SimpleTypeSerializerSnapshotNumberWindow {public TimeWindowSerializerSnapshot() {super(Serializer::new);}} }自定义WindowAssigner 窗口对象定义好了接下来就是定义窗口分配对象。 简单原则我们把数字划分为三个窗口分别是小数窗口、中位数窗口、大数窗口。 如下示例继承 WindowAssigner 类重写 assignWindows 方法把数字划分到对应的窗口中。 public static class MyWindowAssigner extends WindowAssignerInteger, NumberWindow {private final int startingMedian;private final int startingLarge;public MyWindowAssigner(int startingMedian, int startingLarge) {this.startingMedian startingMedian;this.startingLarge startingLarge;}Overridepublic CollectionNumberWindow assignWindows(Integer element, long timestamp, WindowAssignerContext windowAssignerContext) {// 将数字划分到 小数、中位数、大数 窗口NumberWindow window;if (element startingMedian) {window new NumberWindow(Integer.MIN_VALUE, startingMedian - 1);} else if (element startingLarge) {window new NumberWindow(startingMedian, startingLarge - 1);} else {window new NumberWindow(startingLarge, Integer.MAX_VALUE);}return List.of(window);}Overridepublic TriggerInteger, NumberWindow getDefaultTrigger(StreamExecutionEnvironment streamExecutionEnvironment) {return null;}Overridepublic TypeSerializerNumberWindow getWindowSerializer(ExecutionConfig executionConfig) {return new NumberWindow.Serializer();}Overridepublic boolean isEventTime() {return false;} }把流程串起来 窗口对象和窗口分配的逻辑都有了接下来就是把整个流程给串起来。 如下示例程序我们定义了一个一秒内生成10个一百以内随机数的数据源Source然后将这些数字流分为一组并为其指定我们自定义的 MyWindowAssigner 窗口分配策略策略中划分了三个窗口数字小于20的归为小数一档、20到80的归为中位数一档、大于80的归为大数一档根本数字分配对应的窗口。然后我们自定义了 Trigger当窗口内积攒的数字达到十个就触发窗口计算并关闭窗口。最终 ProcessWindowFunction 打印窗口内的数字并求和。 public static void main(String[] args) throws Exception {StreamExecutionEnvironment environment StreamExecutionEnvironment.getExecutionEnvironment();environment.addSource(new SourceFunctionInteger() {Overridepublic void run(SourceContextInteger sourceContext) throws Exception {while (true) {Threads.sleep(100);sourceContext.collect(ThreadLocalRandom.current().nextInt(100));}}Overridepublic void cancel() {}}).keyBy(i - all).window(new MyWindowAssigner(20, 80)).trigger(new TriggerInteger, NumberWindow() {Overridepublic TriggerResult onElement(Integer element, long timestamp, NumberWindow numberWindow, TriggerContext triggerContext) throws Exception {ValueStateInteger countState triggerContext.getPartitionedState(new ValueStateDescriptor(count, Integer.class));Integer count Optional.ofNullable(countState.value()).orElse(0) 1;if (count 10) {countState.update(count);return TriggerResult.CONTINUE;}countState.update(0);return TriggerResult.FIRE_AND_PURGE;}Overridepublic TriggerResult onProcessingTime(long timestamp, NumberWindow numberWindow, TriggerContext triggerContext) throws Exception {return null;}Overridepublic TriggerResult onEventTime(long timestamp, NumberWindow numberWindow, TriggerContext triggerContext) throws Exception {return null;}Overridepublic void clear(NumberWindow numberWindow, TriggerContext triggerContext) throws Exception {}}).process(new ProcessWindowFunctionInteger, Object, String, NumberWindow() {Overridepublic void process(String key, ProcessWindowFunctionInteger, Object, String, NumberWindow.Context context, IterableInteger iterable, CollectorObject collector) throws Exception {StringBuilder builder new StringBuilder([ context.window().getMin() - context.window().getMax() ] [);int sum 0;for (Integer value : iterable) {builder.append(value ,);sum value;}builder.append(] sum sum);System.err.println(builder.toString());}});environment.execute(); }运行 Flink 作业控制台输出 [20 - 79] [30,32,24,66,63,37,] sum252 [20 - 79] [71,48,41,55,75,79,] sum369 [80 - 2147483647] [99,90,88,98,85,99,] sum559 [20 - 79] [74,30,56,70,36,78,] sum344尾巴 Flink 的 WindowAssigner 在数据处理中发挥着关键作用。它决定了如何将源源不断的数据流切分成不同的窗口以便进行有针对性的聚合、计算和分析。 通过合理配置 WindowAssigner我们能够根据时间、数量或自定义的逻辑来划分数据灵活地适应各种业务场景。这使得 Flink 能够对海量的实时数据进行高效且精准的处理帮助我们从数据中提取有价值的信息和洞察。
http://www.hkea.cn/news/14525884/

相关文章:

  • 财经网站源码 织梦wiki wordpress
  • 企业做网站优劣网络运营者应当为()
  • 网站建设 统一质量标准企业网站优化软件
  • 网站做百度排名教程wordpress文章页特色
  • 厦门海沧建设局网站做网站的步骤 优帮云
  • 网站开发后如何维护seo是什么姓
  • 做网站做的网页开发价格
  • 做关于什么的网站做网站策划书
  • 湘潭高端网站建设如何做公众号微信
  • 做网站需要什么代码网站域名空间地址
  • 英德建设局网站山东省建设科技协会网站
  • 做微课的网站有哪些方面办公室装修设计方案范本
  • 购买的网站平台建设服务计入广州网站制作知名 乐云践新
  • 手机网站制作注意事项婚庆策划公司的商业模式
  • 昌平石家庄网站建设外贸网站建设的好处
  • 如何检测网站是否安全网站建设商务通什么意思
  • 网站集约化建设报告深圳网络推广最新招聘
  • 福州网站设计网址网站服务费网络建设会计分录
  • 买公司的网站建设白头鹰网站一天可以做多少任务
  • 做问卷调查赚钱的网站好泊头在哪做网站比较好
  • 陵园网站建设价格网站建设报价表模板
  • 花店网站建设规划书上海网站建设平台站霸网络
  • 关于网站设计的新闻wordpress4.9主题安装
  • 广东网页制作与网站建设信用卡申请网站建设
  • 汽车html静态网站服务器的做网站空间
  • 附近网站建设公司云搜索引擎入口
  • 国外有做塑料粒子的网站吗使用微信做网站第三方登录
  • wordpress用户ip网站文章优化流程方案
  • 南通网站开发公司可以设计图案的软件
  • 商商业网站建设asp.net获取网站虚拟目录