当前位置: 首页 > news >正文

南通网站排名团队如何查看一个网站做的外链

南通网站排名团队,如何查看一个网站做的外链,子网站域名ip地址查询,青岛酒巢网络科技有限公司在离线 Hive 中#xff0c;我们经常会使用 Join 进行多表关联。那么在实时中我们应该如何实现两条流的 Join 呢#xff1f;Flink DataStream API 为我们提供了3个算子来实现双流 join#xff0c;分别是#xff1a; join coGroup intervalJoin 下面我们分别详细看一下这…在离线 Hive 中我们经常会使用 Join 进行多表关联。那么在实时中我们应该如何实现两条流的 Join 呢Flink DataStream API 为我们提供了3个算子来实现双流 join分别是 join coGroup intervalJoin 下面我们分别详细看一下这3个算子是如何实现双流 Join 的。 1. Join Joining | Apache Flink Join 算子提供的语义为 “Window join”即按照指定字段和滚动/滑动/会话窗口进行内连接(InnerJoin)。Join 将有相同 Key 并且位于同一窗口中的两条流的元素进行关联。 Join 可以支持处理时间和事件时间两种时间特征。 Join 通用用法如下 stream.join(otherStream) .where(KeySelector) .equalTo(KeySelector) .window(WindowAssigner) .apply(JoinFunction) Join 语义类似与离线 Hive 的 InnnerJoin (内连接)这意味着如果一个流中的元素在另一个流中没有相对应的元素则不会输出该元素。 下面我们看一下 Join 算子在不同类型窗口上的具体表现。 1.1 滚动窗口Join 当在滚动窗口上进行 Join 时所有有相同 Key 并且位于同一滚动窗口中的两条流的元素两两组合进行关联并最终传递到 JoinFunction 或 FlatJoinFunction 进行处理。 如上图所示我们定义了一个大小为 2 秒的滚动窗口最终产生 [0,1][2,3]… 这种形式的数据。上图显示了每个窗口中橘色流和绿色流的所有元素成对组合。需要注意的是在滚动窗口 [6,7] 中由于绿色流中不存在要与橘色流中元素 6、7 相关联的元素因此该窗口不会输出任何内容。 下面我们一起看一下如何实现上图所示的滚动窗口 Join :::color3 可以通过两个socket流将数据合并为一个三元组key,value1,value2 代码演示 import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time;import java.text.ParseException; import java.text.SimpleDateFormat; import java.time.Duration; import java.util.Arrays; import java.util.Date; public class _ShuangLiuJoinDemo {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 并行度不为1 ,效果很难出来因为本地的并行度是16只有16个并行度都触发才能看到效果env.setParallelism(1);//2. source-加载数据 key,0,2021-03-26 12:09:00DataStreamTuple3String, Integer, String greenStream env.socketTextStream(localhost, 8888).map(new MapFunctionString, Tuple3String, Integer, String() {Overridepublic Tuple3String, Integer, String map(String line) throws Exception {String[] arr line.split(,);System.out.println(绿色 Arrays.toString(arr));return Tuple3.of(arr[0], Integer.valueOf(arr[1]), arr[2]);}})// 因为用到了EventTime 所以势必用到水印否则报错.assignTimestampsAndWatermarks(WatermarkStrategy.Tuple3String, Integer, StringforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssignerTuple3String, Integer, String() {Overridepublic long extractTimestamp(Tuple3String, Integer, String element, long recordTimestamp) {Long timeStamp 0L;SimpleDateFormat simpleDateFormat new SimpleDateFormat(yyyy-MM-dd HH:mm:ss);Date date null;try {date simpleDateFormat.parse(element.f2);} catch (ParseException e) {throw new RuntimeException(e);}timeStamp date.getTime();System.out.println(绿色的时间timeStamp);System.out.println(element.f0);return timeStamp;}}));;// 以后这个9999少用因为kafka占用这个端口 key,0,2021-03-26 12:09:00DataStreamTuple3String, Integer, String orangeStream env.socketTextStream(localhost, 7777).map(new MapFunctionString, Tuple3String,Integer,String() {Overridepublic Tuple3String, Integer, String map(String line) throws Exception {String[] arr line.split(,);System.out.println(橘色 Arrays.toString(arr));return Tuple3.of(arr[0],Integer.valueOf(arr[1]),arr[2]);}})// 因为用到了EventTime 所以势必用到水印否则报错.assignTimestampsAndWatermarks(WatermarkStrategy.Tuple3String, Integer, StringforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssignerTuple3String, Integer, String() {Overridepublic long extractTimestamp(Tuple3String, Integer, String element, long recordTimestamp) {Long timeStamp 0L;SimpleDateFormat simpleDateFormat new SimpleDateFormat(yyyy-MM-dd HH:mm:ss);Date date null;try {date simpleDateFormat.parse(element.f2);} catch (ParseException e) {throw new RuntimeException(e);}timeStamp date.getTime();System.out.println(橘色的时间timeStamp);return timeStamp;}}));//3. transformation-数据处理转换DataStream resultStream greenStream.join(orangeStream).where(tup3 - tup3.f0).equalTo(tup3 - tup3.f0).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new JoinFunctionTuple3String, Integer, String, Tuple3String, Integer, String, Tuple3String, Integer, Integer() {Overridepublic Tuple3String, Integer, Integer join(Tuple3String, Integer, String t1, Tuple3String, Integer, String t2) throws Exception {System.out.println(t1.f2);System.out.println(t2.f2);return Tuple3.of(t1.f0, t1.f1, t2.f1);}});//4. sink-数据输出resultStream.print();//5. execute-执行env.execute();} } 总结非常重要 1 要想测试这个效果需要将并行度设置为1 2窗口中数据的打印是需要触发的没有触发的数据窗口内是不会进行计算的所以记得输入触发的数据。 假如使用了EventTime 作为时间语义不管是窗口开始和结束时间还是触发的条件都跟系统时间没有关系而跟输入的数据有关系举例 假如你的第一条数据是key,0,2021-03-26 12:09:01 窗口的大小是5s水印是3秒 窗口的开始时间为 2021-03-26 12:09:00 结束时间是 2021-03-26 12:09:05 触发时间是2021-03-26 12:09:08 为什么呢 水印时间 结束时间 水印时间是2021-03-26 12:09:08 - 3 2021-03-26 12:09:05 2021-03-26 12:09:05 ::: 如上代码所示为绿色流和橘色流指定 BoundedOutOfOrdernessWatermarks Watermark 策略设置100毫秒的最大可容忍的延迟时间同时也会为流分配事件时间戳。假设输入流为 格式两条流输入元素如下所示 绿色流 key,0,2021-03-26 12:09:00 key,1,2021-03-26 12:09:01 key,2,2021-03-26 12:09:02 key,4,2021-03-26 12:09:04 key,5,2021-03-26 12:09:05 key,8,2021-03-26 12:09:08 key,9,2021-03-26 12:09:09 key,11,2021-03-26 12:09:11 ​ 橘色流 key,0,2021-03-26 12:09:00 key,1,2021-03-26 12:09:01 key,2,2021-03-26 12:09:02 key,3,2021-03-26 12:09:03 key,4,2021-03-26 12:09:04 key,6,2021-03-26 12:09:06 key,7,2021-03-26 12:09:07 key,11,2021-03-26 12:09:11 1.2 滑动窗口Join [解释一下即 ] 当在滑动窗口上进行 Join 时所有有相同 Key 并且位于同一滑动窗口中的两条流的元素两两组合进行关联并最终传递到 JoinFunction 进行处理。 如上图所示我们定义了一个窗口大小为 2 秒、滑动步长为 1 秒的滑动窗口。需要注意的是一个元素可能会落在不同的窗口中因此会在不同窗口中发生关联例如绿色流中的0元素。当滑动窗口中一个流的元素在另一个流中没有相对应的元素则不会输出该元素。
http://www.hkea.cn/news/14591136/

相关文章:

  • 牛商网营销型网站多少钱企业网站建设方案 word
  • 深圳网站建设方案书上海高玩seo
  • 大学 英文网站建设网站如何建设目录结构
  • 速贝cms建站系统温州网站升级
  • 山东济南公司网站韩雪冬网站
  • 有网站代码 如何建设网站深圳做外贸网站公司
  • wordpress站点统计插件山西建设公司网站
  • 徐州网站建设哪家专业河北建设厅网站首页
  • 商城网站建设排名iis网站服务器安全隐患
  • asp连接数据库做登录网站完整下载厦门正规的网站建设公司
  • 网站编辑工具购物网站建设与开发
  • 论坛网站太难做没人在线高清观看免费ppt
  • 湖南外贸网站建设山东东营市房价
  • 网站制作好如何上线网站用的字体
  • 网站哪些数据数字营销经理岗位职责
  • 网站内的搜索怎么做的免费网站建设apk
  • 苏州专业高端网站建设公司哪家好梦幻创意北京网站建设
  • 中国建设网站轨道自检验收报告表动漫设计公司
  • 大气学校网站模板做百度线上推广
  • 怎样制作网站教程外贸公司网站建设哪家好
  • 网站怎么产品做推广怎么修改错误 wordpress
  • 沧州南皮手机网站建设灰色词seo排名
  • 合肥教育平台网站建设《网站设计与建设》电子书
  • 城市建设投资公司网站软件是如何开发的
  • 网站运营方案模板wordpress页面调试分类文章
  • 哪个网站做logo设计九天智能建站软件
  • 网站系统平台建设如何建设简易网站
  • 简洁大气的网站首页事务所网站制作方案
  • 甘肃网站建设推广wordpress title description
  • 河南省建设厅网站建设领域涉黑阿里网站备案管理系统