当前位置: 首页 > news >正文

那个网站做外贸好百度seo怎么做

那个网站做外贸好,百度seo怎么做,网站封面怎么做,网站建设四个阶段的流程这篇文章主要介绍协同分组coGroup的使用,先讲解API代码模板,后面会结图解介绍coGroup是如何将流中数据进行分组的. 1 API介绍 数据源# 左流数据 ➜ ~ nc -lk 6666 101,Tom 102,小明 103,小黑 104,张强 105,Ken 106,GG小日子 107,小花 108,赵宣艺 109,明亮# 右流数据 ➜ ~ n…

这篇文章主要介绍协同分组coGroup的使用,先讲解API代码模板,后面会结图解介绍coGroup是如何将流中数据进行分组的.

1 API介绍

  • 数据源
    # 左流数据
    ➜  ~ nc -lk 6666
    101,Tom
    102,小明
    103,小黑
    104,张强
    105,Ken
    106,GG小日子
    107,小花
    108,赵宣艺
    109,明亮
    # 右流数据
    ➜  ~ nc -lk 7777
    101,,本科,程序员
    102,,本科,程序员
    103,,本科,会计
    104,,大专,安全工程师
    105,,硕士,律师
    106,未知,小本,挖粪使者
    108,,本科,人事
    110,,本科,算法工程师
  • 代码
    import org.apache.flink.api.common.functions.CoGroupFunction;
    import org.apache.flink.api.common.typeinfo.TypeHint;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.api.java.tuple.Tuple4;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.util.Collector;/*** @Author: J* @Version: 1.0* @CreateTime: 2023/8/10* @Description: 协同分组**/
    public class FlinkCoGroup {public static void main(String[] args) throws Exception {// 构建流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度env.setParallelism(2);// 数据源1(socket数据源),为了方便测试,根据实际情况自行选择DataStreamSource<String> sourceStream1 = env.socketTextStream("localhost", 6666);// 将数据进行切分返回Tuple2(id,name)SingleOutputStreamOperator<Tuple2<String, String>> mapStream1 = sourceStream1.map(value -> {String[] split = value.split(",");return Tuple2.of(split[0], split[1]);}).returns(new TypeHint<Tuple2<String, String>>() {});// 数据源2(socket数据源),为了方便测试,根据实际情况自行选择DataStreamSource<String> sourceStream2 = env.socketTextStream("localhost", 7777);// 将数据进行切分返回Tuple4(id,gender,education,job)SingleOutputStreamOperator<Tuple4<String, String, String, String>> mapStream2 = sourceStream2.map(value -> {String[] split = value.split(",");return Tuple4.of(split[0], split[1], split[2], split[3]);}).returns(new TypeHint<Tuple4<String, String, String, String>>() {});// 数据流协同DataStream<Tuple4<String, String, String, String>> coGrouped = mapStream1.coGroup(mapStream2).where(tup -> tup.f0) // 左流协同分组字段(mapStream1).equalTo(tup -> tup.f0) // 右流协同分组字段(mapStream2).window(TumblingProcessingTimeWindows.of(Time.seconds(20))) // 开窗口,以处理时间划分(每20秒一个窗口).apply(new CoGroupFunction<Tuple2<String, String>, Tuple4<String, String, String, String>, Tuple4<String, String, String, String>>() {@Overridepublic void coGroup(Iterable<Tuple2<String, String>> first, Iterable<Tuple4<String, String, String, String>> second, Collector<Tuple4<String, String, String, String>> out) throws Exception {/***first 代表左流的迭代器* second 代表右流的迭代器* out 则是返回的数据形式* 具体方法中两个迭代器存数据的原理后续会通过图结合进行解析**/// 这里的逻辑模拟sql中left join// 遍历左流数据(first)for (Tuple2<String, String> left : first) {// 定义右流是否为NULL判断标识boolean flag = false;// 遍历右流数据(second)for (Tuple4<String, String, String, String> right : second) {// 返回left(id, name) + right(gender, education)Tuple4<String, String, String, String> tup4 = Tuple4.of(left.f0, left.f1, right.f1, right.f2);// 输出out.collect(tup4);// 修改判断标识flag = true;}// 如果右流为NULL,则输出左流的数据if (!flag) {// 这里用字符串"NULL"代替null值,方便观察Tuple4<String, String, String, String> tup4 = Tuple4.of(left.f0, left.f1, "NULL", "NULL");// 输出out.collect(tup4);}}}});// 打印结果coGrouped.print();env.execute("Flink CoGroup");}
    }
    
  • 结果
    2> (102,小明,男,本科)
    1> (106,GG小日子,未知,小本)
    2> (109,明亮,NULL,NULL)
    1> (107,小花,NULL,NULL)
    2> (105,Ken,男,硕士)
    2> (103,小黑,女,本科)
    2> (101,Tom,男,本科)
    2> (108,赵宣艺,女,本科)
    2> (104,张强,男,大专)
    
    从数据源和结果数据可以看到和代码逻辑是完全吻合的.

2 原理解析

我这我们先看一下图解,如下

在这里插入图片描述

  • 无界转有界
    在代码中我们开启window,这也是使用coGroup的必要条件,开启window后实际上就是将我们原本的无界数据流转变成一个以20S为界限的有界数据流.
  • 迭代器分组
    将数据进入到窗口内后,就会根据经我们前面设定的条件也就是.where.equalTo中的内容将mapStream1mapStream2中的数据根据key进行分组存储到不同的iterator中.
  • 逻辑计算
    上面已经将数据根据key都存储到iterator中了,这里就会根据我们在new CoGroupFunction<...>(){...}中的写的逻辑将mapStream1mapStream2中具有相同keyiterator进行计算.
  • 输出
    当一个window结束后,就会将数据按照计算后的结果(在代码中就是Tuple4<String, String, String, String>)输出到下游.
http://www.hkea.cn/news/475906/

相关文章:

  • 从客户—管理者为某一公司做一份电子商务网站管理与维护的方案自媒体是如何赚钱的
  • 黑龙江住房和城乡建设厅网站首页每日精选12条新闻
  • 做网站工作都包括什么企业网站搭建
  • 自己可以进行网站建设吗河北网站推广
  • 网站建设与管理论文seo整站怎么优化
  • 西安做网站收费价格网站流量监控
  • 福州网站制作有限公司南京疫情最新情况
  • 国外品牌设计网站天津疫情最新消息
  • 宁波有做网站的地方吗seo报价单
  • 深圳企业网站开发中国法律服务网app最新下载
  • 大连企业网站建站国外域名注册网站
  • 站长工具seo综合查询权重百度在线搜索
  • 伊犁网站建设评价怎样才能上百度
  • 房地产网站建设方案百度实名认证
  • 做外贸可以在哪些网站注册网络项目免费的资源网
  • 中国建设银行信用卡网站首页青岛关键词优化平台
  • 阿里云网站建设考试题目长沙网站推广服务公司
  • 甘肃建设项目审批权限网站俄罗斯搜索引擎yandex官网入口
  • 网站建设公司新员工培训ppt模板百度热门搜索排行榜
  • 仿魔客吧网站模板网址大全是ie浏览器吗
  • 网站产品后台界面怎么做湖南关键词排名推广
  • 网站数据每隔几秒切换怎么做的湖南百度seo排名点击软件
  • 网站制作先学什么百度新闻下载安装
  • 河南省网站建设哪家好免费观看行情软件网站进入
  • 粘合剂东莞网站建设体育热点新闻
  • 百度网站排名关键词整站优化培训网站建设
  • 网络平台代理seo外包 杭州
  • 东方头条网站源码免费推广软件工具
  • 北京网站建设公司分享网站改版注意事项流程优化四个方法
  • 案例学 网页设计与网站建设手机百度seo快速排名