当前位置: 首页 > news >正文

高端大气企业网站模板在线申请

高端大气企业网站模板,在线申请,阿里云二级域名网站怎么建设,wordpress 导入图片文章目录 1、水位线的生成原则2、有序流内置水位线3、乱序流内置水位线4、自定义周期性水位线生成器5、自定义断点式水位线生成器6、从数据源中发送水位线 1、水位线的生成原则 水位线出现#xff0c;即代表这个时间之前的数据已经全部到齐#xff0c;之后不会再出现之前的数… 文章目录 1、水位线的生成原则2、有序流内置水位线3、乱序流内置水位线4、自定义周期性水位线生成器5、自定义断点式水位线生成器6、从数据源中发送水位线 1、水位线的生成原则 水位线出现即代表这个时间之前的数据已经全部到齐之后不会再出现之前的数据了。参考前面的乱序流可以得出 想要保证数据绝对正确就得加足够大的延迟但实时性就没保障了想要实时性强就得把延迟设置小但此时迟到数据可能遗漏准确性降低 水位线的定义是对低延迟和结果准确性的一个权衡。Flink生成水位线的方法是.assignTimestampsAndWatermarks()它主要用来为流中的数据分配时间戳并生成水位线来指示事件时间 DataStreamEvent stream env.addSource(xxx);DataStream withTimestampsAndWatermarks stream.assignTimestampsAndWatermarks(WatermarkStrategy对象); WatermarkStrategy是一个接口包含了一个时间戳分配器TimestampAssigner和一个水位线生成WatermarkGenerator public interface WatermarkStrategyT extends TimestampAssignerSupplierT,WatermarkGeneratorSupplierT{// 负责从流中数据元素的某个字段中提取时间戳并分配给元素。时间戳的分配是生成水位线的基础。OverrideTimestampAssignerT createTimestampAssigner(TimestampAssignerSupplier.Context context);// 主要负责按照既定的方式基于时间戳生成水位线OverrideWatermarkGeneratorT createWatermarkGenerator(WatermarkGeneratorSupplier.Context context); } 2、有序流内置水位线 有序流的时间戳全部单调递增没有迟到数据直接WatermarkStrategy.forMonotonousTimestamps()就可以拿到WatermarkStrategy对象 public class WatermarkMonoDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(node01, 9527).map(new WaterSensorMapFunction());// TODO 1.定义Watermark策略WatermarkStrategyWaterSensor watermarkStrategy WatermarkStrategy// 1.1 指定watermark生成升序的watermark没有等待时间.WaterSensorforMonotonousTimestamps()// 1.2 指定 时间戳分配器从数据中提取.withTimestampAssigner(new SerializableTimestampAssignerWaterSensor() {Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {System.out.println(数据 element ,recordTs recordTimestamp);// 返回的时间戳要毫秒这里拿自定义对象的ts属性做为时间戳return element.getTs() * 1000L;}});// TODO 2. 指定 watermark策略SingleOutputStreamOperatorWaterSensor sensorDSwithWatermark sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);sensorDSwithWatermark.keyBy(sensor - sensor.getId())// TODO 3.使用事件时间语义的窗口别再用处理时间TumblingProcessTime.window(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunctionWaterSensor, String, String, TimeWindow() {Overridepublic void process(String s, Context context, IterableWaterSensor elements, CollectorString out) throws Exception {long startTs context.window().getStart();long endTs context.window().getEnd();String windowStart DateFormatUtils.format(startTs, yyyy-MM-dd HH:mm:ss.SSS);String windowEnd DateFormatUtils.format(endTs, yyyy-MM-dd HH:mm:ss.SSS);long count elements.spliterator().estimateSize();out.collect(key s 的窗口[ windowStart , windowEnd )包含 count 条数据 elements.toString());}}).print();env.execute();} } 执行下输入10时逻辑时钟被推到了10s到达区间触发窗口执行全窗口函数的process输出当前窗口的数据 3、乱序流内置水位线 调用WatermarkStrategy. forBoundedOutOfOrderness()传入延迟时间 public class WatermarkOutOfOrdernessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(node01, 9527).map(new WaterSensorMapFunction());// TODO 1.定义Watermark策略WatermarkStrategyWaterSensor watermarkStrategy WatermarkStrategy// 1.1 指定watermark生成乱序的等待3s.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(3))// 1.2 指定 时间戳分配器从数据中提取.withTimestampAssigner((element, recordTimestamp) - {// 返回的时间戳要 毫秒System.out.println(数据 element ,recordTs recordTimestamp);return element.getTs() * 1000L;});// TODO 2. 指定 watermark策略SingleOutputStreamOperatorWaterSensor sensorDSwithWatermark sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);sensorDSwithWatermark.keyBy(sensor - sensor.getId())// TODO 3.使用 事件时间语义 的窗口.window(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunctionWaterSensor, String, String, TimeWindow() {Overridepublic void process(String s, Context context, IterableWaterSensor elements, CollectorString out) throws Exception {long startTs context.window().getStart();long endTs context.window().getEnd();String windowStart DateFormatUtils.format(startTs, yyyy-MM-dd HH:mm:ss.SSS);String windowEnd DateFormatUtils.format(endTs, yyyy-MM-dd HH:mm:ss.SSS);long count elements.spliterator().estimateSize();out.collect(key s 的窗口[ windowStart , windowEnd )包含 count 条数据 elements.toString());}}).print();env.execute();} } 执行 简单分析下结果 第一条数据s111进来创建窗口水位线为1s-3s延迟3s)s11010进来水位线为10-3 7s还未到达10窗口不触发若是有序流无等待下此时窗口已被触发了此时进来一条乱序数据比如s1666-33s水位线保持上面的7不变watermark不会推进且6这条数据也会被统计在[0,10)的区间内s11111进来11-38也不会触发但这条数据是属于[10,20)区间的那个桶的s11313进来达到10窗口触发 4、自定义周期性水位线生成器 上面只是定义了时间戳的提取逻辑水位线的生成采用的默认内置策略。接下来自定义水位线生成器周期性水位生成器。 周期性生成器是通过onEvent()观察判断输入的事件而在onPeriodicEmit()里发射生成的水位线 // 自定义水位线的产生 public class CustomPeriodicWatermarkExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(node01, 9527).map(new WaterSensorMapFunction());// 定义Watermark策略WatermarkStrategyWaterSensor watermarkStrategy WatermarkStrategy// 1.1 指定watermark生成器.WaterSensorforGenerator(context - MyPeriodWatermarkGenerator(3000L))// 1.2 指定时间戳分配器从数据中提取.withTimestampAssigner((element, recordTimestamp) - {// 返回的时间戳要 毫秒System.out.println(数据 element ,recordTs recordTimestamp);return element.getTs() * 1000L;});// TODO 2. 指定 watermark策略SingleOutputStreamOperatorWaterSensor sensorDSwithWatermark sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);sensorDSwithWatermark.keyBy(sensor - sensor.getId())// TODO 3.使用 事件时间语义 的窗口.window(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunctionWaterSensor, String, String, TimeWindow() {Overridepublic void process(String s, Context context, IterableWaterSensor elements, CollectorString out) throws Exception {long startTs context.window().getStart();long endTs context.window().getEnd();String windowStart DateFormatUtils.format(startTs, yyyy-MM-dd HH:mm:ss.SSS);String windowEnd DateFormatUtils.format(endTs, yyyy-MM-dd HH:mm:ss.SSS);long count elements.spliterator().estimateSize();out.collect(key s 的窗口[ windowStart , windowEnd )包含 count 条数据 elements.toString());}}).print();env.execute();}} 模仿前面的内置生成器定义自己的水位线生成器 public class MyPeroidWatermarkGenerator implements WatermarkGeneratorEvent {private Long delayTime 5000L; // 延迟时间private Long maxTs -Long.MAX_VALUE delayTime 1L; // 观察到的最大时间戳//构造方法传入延迟时间构造水位线生成器对象public MyPeroidWatermarkGenerator(long delayTime){this.delayTime delayTime;this.maxTs Long.MIN_VALUE this.delayTime 1;}/*** 每条数据进来都调用一次用来提取最大的事件事件*/Overridepublic void onEvent(Event eventlong eventTimestampWatermarkOutput output) {// 每来一条数据就调用一次maxTs Math.max(event.timestampmaxTs); // 更新最大时间戳System.out.println(调用了onEvent方法获取目前为止最大的时间戳 maxTimestamp);}/*** 周期性调用默认20ms*/Overridepublic void onPeriodicEmit(WatermarkOutput output) {// 发射水位线默认200ms调用一次output.emitWatermark(new Watermark(maxTs - delayTime - 1L));System,out,println(调用了onPeriodicEmit方法,生成watermark (maxTimestamp - delayTs - 1) );}}核心部分指定水位线生成器的Lamdba表达式展开就是 运行 数据没进来前每200ms调用一次发射水位线的方法此时的水位线是构造方法里Long.MIN_VALUE那个进来一条数据调用onEvent最大时间戳被更新到周期后再发射水位线maxTs-delayTs-1继续周期性调用onPeriodicEmit方法 onPeriodicEmit()里调用output.emitWatermark()就可以发出水位线了这个方法由系统框架周期性地调用默认200ms一次 修改默认的周期比如改为400ms env.getConfig().setAutoWatermarkInterval(400L);5、自定义断点式水位线生成器 断点式生成器会不停地检测onEvent()中的事件发现带有水位线信息的当事件时就立即发出水位线。改下代码定义水位线生成器 public class PointWatermarkGenerator implements WatermarkGeneratorEvent {private Long delayTime 5000L; // 延迟时间private Long maxTs -Long.MAX_VALUE delayTime 1L; // 观察到的最大时间戳//构造方法传入延迟时间构造水位线生成器对象public MyPeroidWatermarkGenerator(long delayTime){this.delayTime delayTime;this.maxTs Long.MIN_VALUE this.delayTime 1;}/*** 每条数据进来都调用一次用来提取最大的事件事件*/Overridepublic void onEvent(Event eventlong eventTimestampWatermarkOutput output) {// 每来一条数据就调用一次maxTs Math.max(event.timestampmaxTs); // 更新最大时间戳// 发射水位线output.emitWatermark(new Watermark(maxTs - delayTime - 1L));System.out.println(调用了onEvent方法获取目前为止最大的时间戳 maxTimestamp ,生成watermark (maxTimestamp - delayTs - 1));}/*** 周期性调用默认20ms*/Overridepublic void onPeriodicEmit(WatermarkOutput output) {}}周期性代码改为 //...// 定义Watermark策略WatermarkStrategyWaterSensor watermarkStrategy WatermarkStrategy// 1.1 指定watermark生成器.WaterSensorforGenerator(context - PointWatermarkGenerator(3000L))// 1.2 指定时间戳分配器从数据中提取.withTimestampAssigner((element, recordTimestamp) - {// 返回的时间戳要 毫秒return element.getTs() * 1000L;}); 运行此时不再周期性的发射水位线 6、从数据源中发送水位线 在自定义的数据源中抽取事件时间然后发送水位线 env.fromSource( kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), kafkasource )//注意fromSorce方法的第二个传参之前用的WatermarkStrategy.noWatermark() 注意此时不用再assignTimestampsAndWatermarks了在自定义数据源中生成水位线和在程序中使用assignTimestampsAndWatermarks方法生成水位线二者只能取其一
http://www.hkea.cn/news/14559238/

相关文章:

  • 平顶山企业网站建设大连网站建设价格
  • 河北做网站网站后台管理无法编辑
  • 普陀网站制作网络营销公司有哪些公司
  • 做网站主流用什么语言域名申请后怎么建网站
  • 上海网站建设过程河南郑州最新消息今天
  • 旅游seo整站优化网站开发怎么进行数据库连接
  • 山东广饶建设银行网站专业做运动服装的网站
  • 网站建设沟通准备wordpress后台配置文件
  • 中山哪里做网站西安哪家公司制作响应式网站建设
  • 网站没有做实名认证软件开发专业属于哪个专业大类
  • 网站头部 标签软件开发工具性能审计不包括
  • 网站开发报价 福州wordpress 顶部美化
  • 网站建设类论文格式树莓派lamp WordPress
  • 淄博网站建设方案优化落实防控措施
  • 网站内容建设与管理wordpress字体调整
  • 网站包括哪些主要内容网络推广平台排行榜
  • 帝国cms添加网站地图网站的维护怎么做
  • 地方行业网站徐州做网站的公司哪家好
  • 网站开发需要团队网站备案号去哪查询
  • 广州企业网站建设公司哪家好网站 文件服务器
  • 公司门户网站首页买机票便宜网站建设
  • 网站如何做的有特色在linux上做网站搭建
  • c# asp.net网站开发书网络营销主要做些什么
  • 网站建设甲方给乙方的需求方案电子商务网站建设与管理实训报告
  • 在线课程软件网站建设费用网站制作公司兴田德润怎么联系
  • 在线网站软件免费下载安装重庆玻璃制作厂家
  • 公司做网站留言板商城服务是什么平台
  • wui网站建设开源免费cms可商业用
  • 网站建设帮助中心哈尔滨建立网站公司
  • 图书馆 网站开发 总结c 做的网站怎么上传图片