当前位置: 首页 > news >正文

广州 建 网站wordpress 全站pjax

广州 建 网站,wordpress 全站pjax,做局域网站数据库,一级a做爰片免费网站短视频播放Source源算子#xff08;基础篇二#xff09; 目录 Source源算子#xff08;基础篇二#xff09; 二、源算子#xff08;source#xff09; 1. 准备工作 2.从集合中读取数据 可以使用代码中的fromCollection()方法直接读取列表 也可以使用代码中的fromElements()方…Source源算子基础篇二 目录 Source源算子基础篇二 二、源算子source 1. 准备工作 2.从集合中读取数据 可以使用代码中的fromCollection()方法直接读取列表 也可以使用代码中的fromElements()方法直接列出数据获取 3. 从文件中读取数据 说明 4. 从Socket读取数据 1编写StreamWordCount 2在 Linux 环境的主机bigdata1 上执行下列命令发送数据进行测试 3启动 StreamWordCount 程序 4从 bigdata1 发送数据 5看控制台的输出结果 5.从Kafka读取数据 6.自定义源算子source 7.Flink支持的数据类型 二、源算子source Flink 可以从各种来源获取数据然后构建 DataStream 进行转换处理。一般将数据的输入 来源称为数据源而读取数据的算子就是源算子Source。所以Source 就是我们整个处理 程序的输入端。 Flink 代码中通用的添加 Source 的方式是调用执行环境的 addSource()方法 //通过调用 addSource()方法可以获取 DataStream 对象 val stream env.addSource(...) 方法传入一个对象参数需要实现 SourceFunction 接口返回一个 DataStream。 1. 准备工作 case class Event(user: String, url: String, timestamp: Long) 2.从集合中读取数据 最简单的读取数据的方式就是在代码中直接创建一个集合然后调用执行环境的 fromCollection 方法进行读取。这相当于将数据临时存储到内存中形成特殊的数据结构后 作为数据源使用一般用于测试。 import org.apache.flink.streaming.api.scala._case class Event(user: String, url: String, timestamp: Long)object SourceCollection {def main(args: Array[String]): Unit {//获取流执行环境val env StreamExecutionEnvironment.getExecutionEnvironment//设置并行度并行任务的数量为1env.setParallelism(1)// 创建包含点击事件的列表// 点击操作中包含两个事件val clicks List(Event(Mary, /.home, 1000L), Event(Bob, /.cart, 2000L))//将列表作为流输出//把clicks作为数据流val stream env.fromCollection(clicks)//fromElements从给定的元素集合中创建一个DataStreamval stream1 env.fromElements(Event(zhangsan,/.opt,1000L),Event(lisi,/.opt,2000L))stream.print(stream)stream1.print(stream1)env.execute()} } 可以使用代码中的fromCollection()方法直接读取列表 也可以使用代码中的fromElements()方法直接列出数据获取 3. 从文件中读取数据 真正的实际应用中自然不会直接将数据写在代码中。通常情况下我们会从存储介质中 获取数据一个比较常见的方式就是读取日志文件。这也是批处理中最常见的读取方式。 val stream env.readTextFile(input/words.txt)说明 参数可以是文件可以是目录 可以是绝对路径也可以是相对路径 相对路径是从系统属性 user.dir 获取路径:在 IDEA 下是 project 的根目录, standalone 模式下是集群节点根目录 系统属性 user.dir这是一个Java系统属性它表示用户当前的工作目录。在很多应用中它通常被用作参考路径。 IDEA下是project的根目录当你在IDEA中打开一个项目时项目的根目录通常是IDEA的工作目录。相对路径就是基于这个根目录来确定的。 standalone模式下是集群节点根目录如Hadoop分布式计算系统中的独立模式standalone mode。在这种模式下路径可能是相对于集群节点的根目录。 也可以从 HDFS 目录下读取, 使用路径 hdfs://... 前提要在pom文件中添加hadoop相关依赖 4. 从Socket读取数据 不论从集合还是文件我们读取的其实都是有界数据。在流处理的场景中数据往往是无 界的。一个简单的例子就是我们之前用到的读取 socket 文本流。这种方式由于吞吐量小、 稳定性较差一般也是用于测试。 //通过主机名和端口号读取socket文本流val linDs env.socketTextStream(bigdata1,7777) 具体实现案例 1编写StreamWordCount import org.apache.flink.streaming.api.scala._object StreamWordCount {def main(args: Array[String]): Unit {val env StreamExecutionEnvironment.getExecutionEnvironment//通过主机名和端口号读取socket文本流val linDs env.socketTextStream(bigdata1,7777)//进行转换计算val result linDs.flatMap(data data.split( )) //用空格切分字符串.map((_,1)) //切分后的字符串转换为一个元组.keyBy(_._1) //使用元组的第一个字段进行分组.sum(1) //分组后的数据的第二个字段进行累加//打印计算结果result.print()env.execute()} } 2在 Linux 环境的主机bigdata1 上执行下列命令发送数据进行测试 $ nc -lk 7777 3启动 StreamWordCount 程序 我们会发现程序启动之后没有任何输出、也不会退出。这是正常的——因为 Flink 的流处 理是事件驱动的当前程序会一直处于监听状态只有接收到数据才会执行任务、输出统计结果。 4从 bigdata1 发送数据 hello flink hello world hello scala 5看控制台的输出结果 5.从Kafka读取数据 Kafka 作为分布式消息传输队列是一个高吞吐、易于扩展的消息系统。 而消息队列的传输方式恰恰和流处理是完全一致的。 所以可以说 Kafka 和 Flink 天生一对是当前处理流式数据的双子星。 在如今的实时流处理应用中由 Kafka 进行数据的收集和传输Flink 进行分析计算这样的架构已经成为众多企业的首选 调用 env.addSource()传入 FlinkKafkaConsumer 的对象实例就可以了。 创建 FlinkKafkaConsumer 时需要传入三个参数 第一个参数 topic定义了从哪些主题中读取数据。可以是一个 topic也可以是 topic 列表还可以是匹配所有想要读取的 topic 的正则表达式。当从多个 topic 中读取数据 时Kafka 连接器将会处理所有 topic 的分区将这些分区的数据放到一条数据流中 去。第二个参数是一个 DeserializationSchema 或者 KeyedDeserializationSchema。Kafka 消 息被存储为原始的字节数据所以需要反序列化成 Java 或者 Scala 对象。上面代码中 53 使用的 SimpleStringSchema是一个内置的 DeserializationSchema它只是将字节数 组简单地反序列化成字符串。DeserializationSchema 和 KeyedDeserializationSchema 是 公共接口所以我们也可以自定义反序列化逻辑。第三个参数是一个 Properties 对象设置了 Kafka 客户端的一些属性。 更新中... 6.自定义源算子source 接下来我们创建一个自定义的数据源实现 SourceFunction 接口。主要重写两个关键方法 run()和 cancel()。 run()方法使用运行时上下文对象SourceContext向下游发送数据cancel()方法通过标识位控制退出循环来达到中断数据源的效果。 7.Flink支持的数据类型
http://www.hkea.cn/news/14325225/

相关文章:

  • jsp网站开发需要什么技术编程教程免费视频
  • 做网站怎么赚钱 111哪里可以做拍卖网站
  • 深圳专业的网站建设哪个网站做原创歌曲
  • 曲阳路街道网站建设网络推广是做什么工作
  • wordpress无法开启多站点远程教育网站建设
  • 假网站怎么做呢企业品牌网站建设应该怎么做
  • 无锡手机网站建设公司怎样创建网页
  • html5手机网站织梦模板怎么把本地wordpress上传到服务器
  • 网站降权怎么办雅虎搜索引擎首页
  • 高校校园网站建设自定义导航网站 源码
  • 光谷做网站推广电话湖南住建云
  • 上海网站推洛阳万悦网站建设
  • 重庆好的网站建设公司龙岩市人才网最新招聘信息
  • 优秀行业网站简阳网站建设
  • 关于网站维护的书籍打不开建设银行网站
  • 灵芝住房和城乡建设局局网站wordpress 让置顶显示在分类目前
  • 上海网站模板wordpress蘑菇街
  • 做英语阅读的网站南充城市建设投诉网站
  • 做网站用什么服务器会比较好wordpress页面是什么
  • 做响应式网站的流程暖暖韩国中文免费观看播放
  • 手机 网站做移动网站建设
  • 如何做国际网站邯郸做网站价格
  • 论学院网站建设项目的进度管理制度福建省网站建设有限公司
  • 网站开发可行性分析报告范文台州做网站优化
  • 代理网站地址密云富阳网站建设
  • 跨境电商网络营销是什么镇江关键字优化公司
  • 珠海网站设计网络管理员考试
  • 网上学习做网站知识付费微网站开发
  • 网站建设模板下载h5网站开发实例教程
  • 四川网站推广c2c平台排名