当前位置: 首页 > news >正文

百度广告联盟看广告赚钱企业优化方案

百度广告联盟看广告赚钱,企业优化方案,网站建设发展方向及前景展望,seo月薪一、Transform 转换算子可以把一个或多个DataStream转成一个新的DataStream.程序可以把多个复杂的转换组合成复杂的数据流拓扑. 二、基本转换算子 2.1、map#xff08;映射#xff09; 将数据流中的数据进行转换, 形成新的数据流#xff0c;消费一个元素并产出一个元素…一、Transform 转换算子可以把一个或多个DataStream转成一个新的DataStream.程序可以把多个复杂的转换组合成复杂的数据流拓扑. 二、基本转换算子 2.1、map映射 将数据流中的数据进行转换, 形成新的数据流消费一个元素并产出一个元素 package com.lyh.flink05;import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource;public class Transform_map {public static void main(String[] args) throws Exception {ExecutionEnvironment svn ExecutionEnvironment.getExecutionEnvironment();DataSourceInteger s_num svn.fromElements(1, 2, 3, 4, 5);s_num.map(new MapFunctionInteger, Integer() {Overridepublic Integer map(Integer values) throws Exception {return values*values;}}).print();} }2.2、filter过滤 根据指定的规则将满足条件true的数据保留不满足条件(false)的数据丢弃 package com.lyh.flink05;import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource;public class Transform_filter {public static void main(String[] args) throws Exception {ExecutionEnvironment svn ExecutionEnvironment.getExecutionEnvironment();DataSourceInteger elements svn.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9); // elements.filter(new FilterFunctionInteger() { // Override // public boolean filter(Integer integer) throws Exception { // if (integer % 2 0 ) // return false; // else { // return true; // } // } // }).print();elements.filter(value - value % 2 ! 0).print();} }2.3、flatMap扁平映射 消费一个元素并产生零个或多个元素 package com.lyh.flink05;import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.util.Collector;import java.util.concurrent.ExecutionException;public class flatMap {public static void main(String[] args) throws Exception {ExecutionEnvironment svn ExecutionEnvironment.getExecutionEnvironment();DataSourceInteger dataSource svn.fromElements(1, 2, 3);dataSource.flatMap(new FlatMapFunctionInteger, Integer() {Overridepublic void flatMap(Integer integer, CollectorInteger collector) throws Exception {collector.collect(integer*integer);collector.collect(integer*integer*integer);}}).print();} }三、聚合算子 3.1、keyBy按键分区 把流中的数据分到不同的分区(并行度)中.具有相同key的元素会分到同一个分区中 package com.lyh.flink05;import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class keyBy_s {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)env.fromElements(Tuple2.of(2L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(2L, 4L), Tuple2.of(1L, 2L)).keyBy(0) // 以数组的第一个元素作为key.map((MapFunctionTuple2Long, Long, String) longLongTuple2 - key: longLongTuple2.f0 ,value: longLongTuple2.f1).print();env.execute(execute);} }3.2、sum,min,max,minBy,maxBy简单聚合 KeyedStream的每一个支流做聚合。执行完成后会将聚合的结果合成一个流返回所以结果都是DataStream package com.lyh.flink05;import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class keyBy_s {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)env.fromElements(Tuple2.of(2L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(2L, 4L), Tuple2.of(1L, 2L)).keyBy(0) // 以数组的第一个元素作为key.sum(1).print();env.execute(execute);} }3.3、reduce归约聚合 一个分组数据流的聚合操作合并当前的元素和上次聚合的结果产生一个新的值返回的流中包含每一次聚合的结果而不是只返回最后一次聚合的最终结果。 package com.lyh.flink05;import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class keyByReduce {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);env.fromElements(Tuple2.of(1L,2L),Tuple2.of(2L,4L),Tuple2.of(2L,9L),Tuple2.of(1L,9L),Tuple2.of(1L,2L),Tuple2.of(2L,3L)).keyBy(0).reduce(new ReduceFunctionTuple2Long, Long() {Overridepublic Tuple2Long, Long reduce(Tuple2Long, Long values1, Tuple2Long, Long values2) throws Exception {return new Tuple2(values1.f0,values1.f1values2.f1);}}).print();env.execute();} }3.4、process底层处理 process算子在Flink算是一个比较底层的算子, 很多类型的流上都可以调用, 可以从流中获取更多的信息(不仅仅数据本身)process 用法比较灵活后面再做专门研究。 package com.lyh.flink05; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag;public class Process_s {public static void main(String[] args) throws Exception {// 获取环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSourceInteger streamSource env.fromElements(1, 2, 3, 4, 5);SingleOutputStreamOperatorInteger processed streamSource.process(new ProcessFunctionInteger, Integer() {Overridepublic void processElement(Integer value, Context ctx, CollectorInteger out) throws Exception {if (value % 3 0) {//测流数据ctx.output(new OutputTagInteger(3%0, TypeInformation.of(Integer.class)), value);}if (value % 3 1) {//测流数据ctx.output(new OutputTagInteger(3%1, TypeInformation.of(Integer.class)), value);}//主流 ,数据out.collect(value);}});DataStreamInteger output0 processed.getSideOutput(new OutputTag(3%0,TypeInformation.of(Integer.class)));DataStreamInteger output1 processed.getSideOutput(new OutputTag(3%1,TypeInformation.of(Integer.class)));output1.print();env.execute();} }四、合流算子 4.1、connect连接 在某些情况下我们需要将两个不同来源的数据流进行连接实现数据匹配比如订单支付和第三方交易信息这两个信息的数据就来自于不同数据源连接后将订单支付和第三方交易信息进行对账此时才能算真正的支付完成。 Flink中的connect算子可以连接两个保持他们类型的数据流两个数据流被connect之后只是被放在了一个同一个流中内部依然保持各自的数据和形式不发生任何变化两个流相互独立。 package com.lyh.flink05;import org.apache.flink.streaming.api.datastream.ConnectedStreams; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.kafka.common.protocol.types.Field;public class connect_s {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSourceInteger data1 env.fromElements(1, 2, 3, 4, 5);DataStreamSourceString data2 env.fromElements(a, b, c);ConnectedStreamsInteger, String data3 data1.connect(data2);data3.getFirstInput().print(data1);data3.getSecondInput().print(data2);env.execute();} }4.2、union合并 package com.lyh.flink05;import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class union_s {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSourceInteger data1 env.fromElements(1, 2, 3);DataStreamSourceInteger data2 env.fromElements(555,666);DataStreamSourceInteger data3 env.fromElements(999);DataStreamInteger data data1.union(data2).union(data3);data.print();env.execute();} }
http://www.hkea.cn/news/14315758/

相关文章:

  • 班级建设网站首页怎么修改wordpress站点代码
  • 网站建设 ipc备案专业的免费网站建设
  • 浙江建设职业技术学院oa网站建设部网站资质升级陈述通过
  • 盖州网站优化中国十大做网站公司
  • 手机网站建设新闻手机优化系统
  • 腾度网站建设益阳住房和城乡建设局网站
  • 做网站管理员需要哪些知识一个虚拟主机能安装2个网站吗
  • 网站制作怎么自己做微信视频号怎么引流推广
  • 营销企业网站建设应遵守的原则企业采购
  • 做网站和彩票的同步开奖怎么做北京网站制作公司招聘
  • 2017年用什么语言做网站青岛中小微企业互联网站建设补贴
  • 免费代刷网站推广快速php团购网站开发
  • 国内专业的室内设计网站我的个人主页模板
  • 免费自创网站wordpress替换图片不显示
  • Excel怎么做网站链接优化师是做什么的
  • 网站图片做伪静态网站需要收集什么建站资源
  • 企业网站排名软件能优化wordpress 百万级数据
  • 建手机网站款软件怎样给自己做网站
  • 2018年做返利网站百度销售是做什么
  • 企业网站样板制作中国建筑网官网查询证书
  • 网站信息内容建设长春企业网站如何建设
  • 免费自建网站步骤安卓移动网站开发详解
  • 湘潭做网站 搜搜磐石网络优化设计四年级下册语文答案
  • 优秀网站设计 pdfwordpress建站打不开二级页面
  • 唐山哪个公司做网站怎么进入国外网站
  • 深圳市建设局网站首页wordpress 图片模糊
  • 广东建设厅的网站查询小红书推广营销
  • 盛泽网站建设网站建设就选
  • 网站标题设计品牌设计的原则有哪些
  • 上海网站建设 觉策动力全渠道营销的概念