当前位置: 首页 > news >正文

外贸专业网站制作老薛主机做电影网站

外贸专业网站制作,老薛主机做电影网站,需要手机端网站建设的企业,智能小程序开发报价文章目录 8.1 窗口联结#xff08;Window Join#xff09;8.2 **间隔联结#xff08;Interval Join#xff09;** 8.1 窗口联结#xff08;Window Join#xff09; Flink为基于一段时间的双流合并专门提供了一个窗口联结算子#xff0c;可以定义时间窗口#xff0c;并… 文章目录 8.1 窗口联结Window Join8.2 **间隔联结Interval Join** 8.1 窗口联结Window Join Flink为基于一段时间的双流合并专门提供了一个窗口联结算子可以定义时间窗口并将两条流中共享一个公共键key的数据放在窗口中进行配对处理。 package org.example.watermark;import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time;public class WindowJoinDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorTuple2String, Integer ds1 env.fromElements(Tuple2.of(a, 1),Tuple2.of(a, 2),Tuple2.of(b, 3),Tuple2.of(c, 4)).assignTimestampsAndWatermarks(WatermarkStrategy.Tuple2String, IntegerforMonotonousTimestamps().withTimestampAssigner((value, ts) - value.f1 * 1000L));SingleOutputStreamOperatorTuple3String, Integer, Integer ds2 env.fromElements(Tuple3.of(a, 1, 1),Tuple3.of(a, 11, 1),Tuple3.of(b, 2, 1),Tuple3.of(b, 12, 1),Tuple3.of(c, 14, 1),Tuple3.of(d, 15, 1)).assignTimestampsAndWatermarks(WatermarkStrategy.Tuple3String, Integer, IntegerforMonotonousTimestamps().withTimestampAssigner((value, ts) - value.f1 * 1000L));// TODO window join// 1. 落在同一个时间窗口范围内才能匹配// 2. 根据keyby的key来进行匹配关联// 3. 只能拿到匹配上的数据类似有固定时间范围的inner joinDataStreamString join ds1.join(ds2).where(r1 - r1.f0) // ds1的keyby.equalTo(r2 - r2.f0) // ds2的keyby.window(TumblingEventTimeWindows.of(Time.seconds(10))).apply(new JoinFunctionTuple2String, Integer, Tuple3String, Integer, Integer, String() {/*** 关联上的数据调用join方法* param first ds1的数据* param second ds2的数据* return* throws Exception*/Overridepublic String join(Tuple2String, Integer first, Tuple3String, Integer, Integer second) throws Exception {return first ----- second;}});join.print();env.execute();} }其实仔细观察可以发现窗口join的调用语法和我们熟悉的SQL中表的join非常相似 SELECT * FROM table1 t1, table2 t2 WHERE t1.id t2.id; 这句SQL中where子句的表达等价于inner join … on所以本身表示的是两张表基于id的“内连接”inner join。而Flink中的window join同样类似于inner join。也就是说最后处理输出的只有两条流中数据按key配对成功的那些如果某个窗口中一条流的数据没有任何另一条流的数据匹配那么就不会调用JoinFunction的.join()方法也就没有任何输出了。 8.2 间隔联结Interval Join 间隔联结的原理 间隔联结具体的定义方式是我们给定两个时间点分别叫作间隔的“上界”upperBound和“下界”lowerBound于是对于一条流不妨叫作A中的任意一个数据元素a就可以开辟一段时间间隔[a.timestamp lowerBound, a.timestamp upperBound],即以a的时间戳为中心下至下界点、上至上界点的一个闭区间我们就把这段时间作为可以匹配另一条流数据的“窗口”范围。所以对于另一条流不妨叫B中的数据元素b如果它的时间戳落在了这个区间范围内a和b就可以成功配对进而进行计算输出结果。所以匹配的条件为 a.timestamp lowerBound b.timestamp a.timestamp upperBound 这里需要注意做间隔联结的两条流A和B也必须基于相同的key下界lowerBound应该小于等于上界upperBound两者都可正可负间隔联结目前只支持事件时间语义。 如下图所示我们可以清楚地看到间隔联结的方式 下方的流A去间隔联结上方的流B所以基于A的每个数据元素都可以开辟一个间隔区间。我们这里设置下界为-2毫秒上界为1毫秒。于是对于时间戳为2的A中元素它的可匹配区间就是[0, 3],流B中有时间戳为0、1的两个元素落在这个范围内所以就可以得到匹配数据对2, 0和2, 1。同样地A中时间戳为3的元素可匹配区间为[1, 4]B中只有时间戳为1的一个数据可以匹配于是得到匹配数据对3, 1)。 所以我们可以看到间隔联结同样是一种内连接inner join。与窗口联结不同的是interval join做匹配的时间段是基于流中数据的所以并不确定而且流B中的数据可以不只在一个区间内被匹配。 stream1.keyBy(KeySelector).intervalJoin(stream2.keyBy(KeySelector)).between(Time.milliseconds(-2), Time.milliseconds(1)).process (new ProcessJoinFunctionInteger, Integer, String(){Overridepublic void processElement(Integer left, Integer right, Context ctx, CollectorString out) {out.collect(left , right);}});处理迟到数据 //2. 调用 interval join OutputTagTuple2String, Integer ks1LateTag new OutputTag(ks1-late, Types.TUPLE(Types.STRING, Types.INT)); OutputTagTuple3String, Integer, Integer ks2LateTag new OutputTag(ks2-late, Types.TUPLE(Types.STRING, Types.INT, Types.INT));SingleOutputStreamOperatorString process ks1.intervalJoin(ks2).between(Time.seconds(-2), Time.seconds(2)).sideOutputLeftLateData(ks1LateTag) // 将 ks1的迟到数据放入侧输出流.sideOutputRightLateData(ks2LateTag) // 将 ks2的迟到数据放入侧输出流.process(new ProcessJoinFunctionTuple2String, Integer, Tuple3String, Integer, Integer, String() {/*** 两条流的数据匹配上才会调用这个方法* param left ks1的数据* param right ks2的数据* param ctx 上下文* param out 采集器* throws Exception*/Overridepublic void processElement(Tuple2String, Integer left, Tuple3String, Integer, Integer right, Context ctx, CollectorString out) throws Exception {// 进入这个方法是关联上的数据out.collect(left ------ right);}});process.print(主流); process.getSideOutput(ks1LateTag).printToErr(ks1迟到数据); process.getSideOutput(ks2LateTag).printToErr(ks2迟到数据);env.execute();
http://www.hkea.cn/news/14531721/

相关文章:

  • 网站企业快速备案怎么利用网站做兼职
  • 做网站的都改行做什么了基于android的app开发步骤
  • 厦门建网站多少钱画册设计步骤
  • wordpress手机端跳转seo描述是写什么
  • 人防网站建设质量好网站建设价格
  • 教你做兼职的网站如何建设网站和app
  • 企业网站开发需求分析找装修公司电话
  • 厦门网站建设公司哪家好网页设计属于什么行业
  • 海口旅游类网站建设做盗版视频网站成本多少
  • 西安网站制作优化作品设计方案怎么写
  • 软件发布流程兰州网站的优化
  • 58同城网站建设案例做高仿表网站容易被k吗
  • 不需要写代码的网站开发软件网站开发整合套件
  • 新建网站怎么保存网站开发的项目
  • 哈尔滨开发网站WordPress第三方注册
  • 有哪些做画册的网站手机网站制作移动高端网站建设
  • 旅游主题网站策划书wordpress 注册登录插件
  • 电脑上做网站的软件空间设计师工资一般多少
  • 给别人做网站 网站违法了济南外贸网站推广
  • 百度收录提交申请网站网站建设不完整 审核
  • 下载官方购物网站百度官网入口链接
  • 上海网站开发服务商网站建设框架模板下载
  • 关于网站开发的网站内容页怎么设计模板
  • 建站公司获客成本网站和微网站
  • 东莞工业品网站建设网站设计服务平台
  • 新乡网站推广公司如何设计旅游网站
  • 网站怎么做认证织梦发布网站
  • 网站301重定向代码企业采购
  • 如何做好网站关键词优化云南建设银行官方网站
  • 彩票网站建设开发做爰片在线看网站