当前位置: 首页 > news >正文

网站建设业务开展方案北京优化推广

网站建设业务开展方案,北京优化推广,网站域名使用怎么做待摊分录,网站必须做等保合规一、Flink 时间语义类型 Event Time:是事件创建的时间。它通常由事件中的时间戳描述,例如采集的日志数据中,每一条日志都会记录自己的生成时间,Flink 通过时间戳分配器访问事件时间戳Ingestion Time :是数据进入 Flink…

一、Flink 时间语义类型

在这里插入图片描述

  • Event Time:是事件创建的时间。它通常由事件中的时间戳描述,例如采集的日志数据中,每一条日志都会记录自己的生成时间,Flink 通过时间戳分配器访问事件时间戳
  • Ingestion Time :是数据进入 Flink 的时间
  • Processing Time:是每一个执行基于时间操作的算子的本地系统时间,与机器相关,默认的时间属性就是 Processing Time

二、EventTime 引入

Flink 默认是按照 ProcessingTime 来处理数据的

/**在 Flink 的流式处理中,绝大部分情况推荐使用 eventTime,一般只在 eventTime 无法使用时,才会被迫使用 ProcessingTime 或者 Ing estionTime 。使用 EventTime ,需要先引入 EventTime 的时间属性
*/
public class EventTimeTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//引入 EvenetTime//TimeCharacteristic 是一个枚举类,有 ProcessingTime、IngestionTime 和 EventTime 三个属性env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);}
}

三、Watermark

1. 数据乱序情况

在这里插入图片描述

  • 正常情况下,Flink 接收到的事件应该要是按照事件的产生时间 (EventTime) 的先后顺序排列的
  • 实际情况下,事件从产生到进入 source 再到触发 operator,其中间是有一个过程和时间的,而且由于网络、分布式等原因会造成 Flink 接收到的事件的先后顺序不是严格按照事件的 EventTime 顺序排列的,即所谓的乱序数据
  • 乱序数据的问题会造成窗口触发关闭的时间混乱,计算不准确
  • Flink 处理乱序数据的机制:Watermark + allowedLateness + sideOutputLateData

2. Watermark 介绍

  • Watermark 是一种使用延迟触发 window 执行来处理乱序数据的机制
  • 原理:当设置 Watermark = t 时 (即延迟时长为 t),则 Flink 每一次都会获取已经到达的数据中的最大的 EventTime,然后判断 maxEventTime - t 是否等于某一个窗口的触发时间,如果相等则认为属于这个窗口的所有数据都已经到达,这个窗口被触发执行关闭,也可能存在数据丢失
  • 在数据有序的流中,相当于 Watermark = 0,即已经到达的数据中的最大的 EventTime 等于某一个窗口的触发时间,则这个窗口被触发执行关闭
  • 一般将 Watermark 设置为乱序数据流中最大的迟到时间差

3. Watermark 特点和行为

  • 水位线 (Watermark) 是作为一个特殊的数据插入到数据流中的一个标记
  • 水位线 (Watermark) 在 Flink 程序中是一个常量类,有一个时间戳属性,用来表示当前事件时间的进展
  • 水位线 (Watermark) 是基于数据的 EventTime 时间戳生成的
  • 水位线 (Watermark) 的时间戳必须单调递增,以确保任务的事件时间时钟一直向前推进

4. Watermark 在任务间的传递

任务并行度不为 1;Watermark 设置的位置越靠近 Source 端越好

在这里插入图片描述

  • 一个任务会接收上游多个并行任务的数据,也会向下游多个并行任务发送数据
  • 从上游多个并行任务接收 Watermark:使用 Partition WM 分别存储接收到的不同分区任务的 Watermark,并以其中最小的 Watermark 作为自己当前的事件时间
  • 向下游多个并行任务发送 Watermark:采取广播的分区策略,向下游的每一个任务都发送一份 Watermark,如果后续 Watermark 没有变更则不会重复发送

5. Watermark 引入

5.1 核心代码
/**方法签名:DataStream.assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T>)DataStream.assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks<T>)参数:1.AssignerWithPeriodicWatermarks:继承 TimestampAssigner 接口,周期性的生成 watermark,常用实现类为:BoundedOutOfOrdernessTimestampExtractor 和 AscendingTimestampExtractor2.AssignerWithPunctuatedWatermarks:继承 TimestampAssigner 接口,间断式地生成 watermark
*/
public class WatermarkTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//引入 EvenetTime       env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);DataStream<String> dataStream = env.socketTextStream("localhost", 7777);DataStream<SensorReading> inputStream = dataStream.map(new MapFunction<SensorReading>() {@Overridepublic SensorReading map(String value) throws Exception {String[] fields = value.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));}});//有序数据设置事件时间戳(毫秒数)和watermark//不需要传递watermark延迟时间,默认是当前事件时间戳 - 1ms 作为watermarkinputStream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<SensorReading>() {@Overridepublic long extractAscendingTimestamp(SensorReading element) {return element.getTimestamp() * 1000L;}});//乱序数据设置事件时间戳(毫秒数)和watermark//BoundedOutOfOrdernessTimestampExtractor 构造方法必须传入watermark延迟时间//生成的watermark时间戳 = 当前所有事件的最大时间戳 - 延迟时间inputStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) {@Overridepublic long extractTimestamp(SensorReading element) {return element.getTimestamp() * 1000L;}});env.execute();}
}
5.2 AssignerWithPeriodicWatermarks

系统会周期性地生成 watermark 并插入到数据流中,默认周期是 200 毫秒

/**设置watermark生成周期:env.getConfig.setAutoWatermarkInterval(milliseconds);产生watermark的逻辑:每隔 0.2 秒钟,Flink 会调用 AssignerWithPeriodicWatermarks 的 getCurrentWatermark() 方法获取一个时间戳,如果大于之前水位的时间戳,新的 watermark 会被插入到流中。这个检查保证了水位线是单调递增的。如果方法返回的时间戳小于等于之前水位的时间戳,则不会产生新的 watermark自定义watermark周期生成器:实现 AssignerWithPeriodicWatermarks 接口,并重写 getCurrentWatermark 和 extractTimestamp 方法
*/
public class MyPeriodicAssigner implements AssignerWithPeriodicWatermarks<SensorReading> {private Long bound = 60 * 1000L;  // watermark延迟时间private Long maxTs = Long.MIN_VALUE;  // 当前最大时间戳@Nullable@Overridepublic Watermark getCurrentWatermark() {return new Watermark(maxTs - bound);}@Overridepublic long extractTimestamp(SensorReading element, long previousElementTimestamp) {maxTs = Math.max(maxTs, element.getTimestamp()); //获取当前最大的事件时间戳return element.getTimestamp();}
}
5.3 AssignerWithPunctuatedWatermarks

间断式地生成 watermark,可以根据需要对每条数据进行条件判断筛选来确定是否生成 watermark

public class MyPunctuatedAssigner implements AssignerWithPunctuatedWatermarks<SensorReading> {private Long bound = 60 * 1000L;  // 延迟时间@Nullable@Overridepublic Watermark checkAndGetNextWatermark(SensorReading lastElement, long extractedTimestamp) {if(lastElement.getId().equals("sensor_1")) {return new Watermark(extractedTimestamp - bound);} else {return null;}}@Overridepublic long extractTimestamp(SensorReading element, long previousElementTimestamp) {return element.getTimestamp();}
}

四、EventTime 的 window 操作

1. 滚动时间窗口操作

/**需求:统计 15 秒内的最小温度值,设置 2 秒的延迟
*/
public class TumblingEventTimeWindowTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);/*sensor_1,1547718199,35.8sensor_6,1547718201,15.4sensor_7,1547718202,6.7sensor_10,1547718205,38.1sensor_1,1547718207,36.3sensor_1,1547718209,32.8sensor_1,1547718212,37.1...*/DataStream<String> inputStream = env.socketTextStream("localhost", 7777);DataStream<SensorReading> dataStream = inputStream.map(new MapFunction<SensorReading>() {@Overridepublic SensorReading map(String value) {String[] fields = value.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));}}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) {@Overridepublic long extractTimestamp(SensorReading element) {return element.getTimestamp() * 1000L;}});//开窗聚合SingleOutputStreamOperator<SensorReading> minTempStream = dataStream.keyBy("id").timeWindow(Time.seconds(15)).minBy("temperature");minTempStream.print("minTemp");/**输出的结果分析:1.在接收到 sensor_1,1547718212,37.1 时,触发了一个窗口关闭,此时数据的 EventTime 为 1547718212,由于 watermark 延迟时间设置为 2,所以该窗口触发关闭的时间戳为 1547718212 - 2 = 1547718210,该窗口的范围为 [1547718195,1547718210)2.当前第一个窗口是 [1547718195,1547718210),其起始点的确定规则为:2.1 滚动时间窗口使用的窗口分配器为 TumblingEventTimeWindows 类2.2 TumblingEventTimeWindows 的 assignWindows 方法中调用 getWindowStartWithOffset 方法获取起始点2.3 getWindowStartWithOffset(timestamp, offset, windowSize):方法逻辑为 timestamp - (timestamp - offset + windowSize) % windowSize,默认 offset 为 0,所以最终得到的起始点应该是 windowSize 的整数倍,在本例中的起始点为 1547718199 - (1547718199-0+15)%15 = 15477181953.偏移量 offset:一般是用来处理不同时区的数据*/env.execute();}   
}

2. 迟到数据处理

/**需求:统计 15 秒内的最小温度值,设置 2 秒的延迟,并允许 1 分钟的迟到数据,1 分钟后的数据写入侧输出流
*/
public class TumblingEventTimeWindowDelayTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);DataStream<String> inputStream = env.socketTextStream("localhost", 7777);DataStream<SensorReading> dataStream = inputStream.map(new MapFunction<SensorReading>() {@Overridepublic SensorReading map(String value) {String[] fields = value.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));}}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) {@Overridepublic long extractTimestamp(SensorReading element) {return element.getTimestamp() * 1000L;}});OutputTag<SensorReading> outputTag = new OutputTag<SensorReading>("late"){};//开窗聚合SingleOutputStreamOperator<SensorReading> minTempStream = dataStream.keyBy("id").timeWindow(Time.seconds(15)).allowedLateness(Time.minutes(1));.sideOutputLateData(outputTag).minBy("temperature");minTempStream.print("minTemp");minTempStream.getSideOutput(outputTag).print("late");/**依次输入数据:sensor_1,1547718199,35.8sensor_1,1547718206,36.3sensor_1,1547718210,34.7sensor_1,1547718211,31sensor_1,1547718209,34.9sensor_1,1547718212,37.1sensor_1,1547718213,33sensor_1,1547718206,34.2sensor_1,1547718202,36...sensor_1,1547718272,34sensor_1,1547718203,30.6输出的结果分析:1.在接收到 sensor_1,1547718212,37.1 时,触发 [1547718195,1547718210) 窗口执行,此时输出数据 sensor_1,1547718209,34.9,此时 2 秒内的延迟数据能被处理  2.在接收到 sensor_1,1547718206,34.2 时,由于设置了允许 1 分钟迟到,所以 [1547718195,1547718210) 窗口仍然没有关闭,此时会更新数据为 sensor_1,1547718206,34.2,此时的系统时间戳为 1547718213 - 2 = 1547718211 - 1547718210 < 603.在接收到 sensor_1,1547718202,36 时,[1547718195,1547718210) 窗口仍然会更新输出一次数据 sensor_1,1547718206,34.24.在接收到 sensor_1,1547718272,34 时,属于 [1547718210,1547718225) 窗口的数据会输出 sensor_1,1547718211,31,此时的系统时间戳为 1547718272 - 2 = 1547718270,由于 1547718270 - 1547718210 >= 60,所以 [1547718195,1547718210) 窗口会真正的关闭5.在之后接收到 sensor_1,1547718203,30.6 时,会把数据输出到侧输出流中*/env.execute();}   
}
http://www.hkea.cn/news/679984/

相关文章:

  • 业主装修日记那个网站做的好片多多可以免费看电视剧吗
  • 租车网站建设站长之家源码
  • 昌吉州回族自治州建设局网站地产渠道12种拓客方式
  • 北京市网站公司网络项目免费的资源网
  • 电子商务网站规划、电子商务网站建设站长工具 忘忧草
  • 凡科建网关键词优化公司哪家好
  • seo排名推广工具seo公司多少钱
  • 做视频网站赚钱怎么在百度上推广自己的公司信息
  • 网站建设凡科厦门网站建设平台
  • 互联网行业pest分析福州百度快速优化排名
  • 做网站的接私活犯法吗如何对网站进行推广
  • 身高差效果图网站优化师和运营区别
  • 谷歌wordpress建站搜索引擎算法
  • .net 购物网站开发源代码发布信息的免费平台
  • 自己做一网站大学生网络营销策划书
  • 关于网站建设的文章百度域名收录提交入口
  • 国人在线做网站推广图片大全
  • 郑州网站建设七彩科技四年级说新闻2023
  • 在什么网站上做自媒体seo整站怎么优化
  • 网站开发要注意安全性公司优化是什么意思
  • 河北邢台做移动网站开通网站需要多少钱
  • 天河网站建设多少钱淘宝关键词优化
  • 中型网站 收益关键词排名查询官网
  • 网站的弹窗是怎么做的谈谈对seo的理解
  • 广州网站制作费用宁波seo外包哪个品牌好
  • 河南高端网站建设广州网站优化页面
  • 企业可以备案几个网站南昌seo实用技巧
  • 网站用什么布局专业网站建设公司
  • 公司网站怎么做分录it培训机构学费一般多少
  • 如何将自己做的网页做成网站绍兴seo