红色企业网站模板,vue 实现网站开发,pr模板网,圣都家装公司简介1. RDD的设计背景
在实际应用中#xff0c;存在许多迭代式计算#xff0c;这些应用场景的共同之处是#xff0c;不同计算阶段之间会重用中间结果#xff0c;即一个阶段的输出结果会作为下一个阶段的输入。但是#xff0c;目前的MapReduce框架都是把中间结果写入到HDFS中存在许多迭代式计算这些应用场景的共同之处是不同计算阶段之间会重用中间结果即一个阶段的输出结果会作为下一个阶段的输入。但是目前的MapReduce框架都是把中间结果写入到HDFS中带来了大量的数据复制、磁盘IO和序列化开销。显然如果能将结果保存在内存当中就可以大量减少IO。RDD就是为了满足这种需求而出现的它提供了一个抽象的数据架构我们不必担心底层数据的分布式特性只需将具体的应用逻辑表达为一系列转换处理不同RDD之间的转换操作形成依赖关系可以实现管道化从而避免了中间结果的落地存储大大降低了数据复制、磁盘IO和序列化开销。
2. RDD的概念
RDDResilient Distributed Datasets弹性分布式数据集代表可并行操作元素的不可变分区集合。
一个RDD就是一个分布式对象集合本质上是一个只读的分区记录集合每个RDD可以分成多个分区每个分区就是一个数据集片段HDFS上的块并且一个RDD的不同分区可以被保存到集群中不同的节点上从而可以在集群中的不同节点上进行并行计算。
RDD提供了一种高度受限的共享内存模型即RDD是只读的记录分区的集合不能直接修改只能基于稳定的物理存储中的数据集来创建RDD或者通过在其他RDD上执行确定的转换操作如map、join和groupBy而创建得到新的RDD。
RDD提供了一组丰富的操作以支持常见的数据运算分为“行动”Action和“转换”Transformation两种类型前者用于执行计算并指定输出的形式后者指定RDD之间的相互依赖关系。两类操作的主要区别是转换操作比如map、filter、groupBy、join等接受RDD并返回RDD而行动操作比如count、collect等接受RDD但是返回非RDD即输出一个值或结果。
RDD典型的执行过程
Spark用Scala语言实现了RDD的API程序员可以通过调用API实现对RDD的各种操作。RDD典型的执行过程如下
1RDD读入外部数据源或者内存中的集合进行创建
2RDD经过一系列的“转换”操作每一次都会产生不同的RDD供给下一个“转换”使用
3最后一个RDD经“行动”操作进行处理并输出到外部数据源或者变成Scala/JAVA集合或变量。
需要说明的是RDD采用了惰性调用即在RDD的执行过程中真正的计算发生在RDD的“行动”操作行动算子底层代码调用了runJob函数对于“行动”之前的所有“转换”操作Spark只是记录下“转换”操作应用的一些基础数据集以及RDD生成的轨迹即相互之间的依赖关系而不会触发真正的计算。 val conf new SparkConf
val sparkContext new SparkContext(conf)
val lines :RDD sparkContext.textFile(logFile)
//lines.filter((a:String) a.contains(hello world))
val count lines.filter(_.contains(hello world)).count()
println(count)
可以看出一个Spark应用程序基本是基于RDD的一系列计算操作。
第1行代码用于创建JavaSparkContext对象
第2行代码从HDFS文件中读取数据创建一个RDD
第3行代码对fileRDD进行转换操作得到一个新的RDD即filterRDD
count()是一个行动操作用于计算一个RDD集合中包含的元素个数。
这个程序的执行过程如下
1创建这个Spark程序的执行上下文即创建SparkContext对象
2从外部数据源即HDFS文件中读取数据创建fileRDD对象
3构建起fileRDD和filterRDD之间的依赖关系形成DAG图这时候并没有发生真正的计算只是记录转换的轨迹
4执行action代码时count()是一个行动类型的操作触发真正的计算开始执行从fileRDD到filterRDD的转换操作并把结果持久化到内存中最后计算出filterRDD中包含的元素个数。
3. spark任务的执行过程
每一个应用都是由driver端组成的并且driver端可以解析用户的代码并且在集群中并行执行spark给大家提供了一个编程对象它是一个抽象的叫做弹性分布式数据集这个数据集和一堆数据的集合并且是被分区的因为分区的数据可以被并行的进行操作rdd的创建方式有两种 1.读取hdfs的文件 2.在driver的一个集合可以转换为rddrdd可以被持久化到内存中并且rdd可以实现更好的失败恢复容错。 为什么rdd是抽象的呢因为rdd并不存在数据它是虚拟的我们在定义逻辑的时候要标识一个节点表示数据在流动到此处的时候要进行什么样的处理我们可以理解rdd是一个代理对象。 上述任务执行过程可以划分为两个stage从创建rdd开始到groupBy的shuffle划分为一个stage然后该shuffle到任务执行结束又是一个stage。后面读源码我们会发现当出现shuffle时就要划分出一个阶段。因为业务逻辑发生了变化。
任务的执行和层架关系 读取hdfs数据的时候映射应该是一个blk块对应一个分区
在一个任务中一个action算子会生成一个job。行动算子的源码都会包含runJob函数在一个job中存在shuffle算子比如group sort切分阶段shuffle1个阶段。shuffle是任务的划分的重点前面的任务会将数据放入到自己的本地存储后续的任务进行数据的拉取。在一个stage中任务都是管道形式执行的避免了io序列化和反序列化这个就是dag切分的原理。在一个阶段中分区数量就是task任务的数量task任务就是一堆非shuffle类算子的整体任务链。有几个分区就会并行的执行几个task任务。有几个分区是根据读取的文件来进行适配的比如有三个blk那么就会生成三个分区因为我们可以在每个分区中进行处理数据实现本地化的处理避免远程io。
我们知道分区的个数与读取的文件的Split切片数量有关。假如textFile读取文件的大小为400M则会被物理切分为3个block因为每个block-size的大小最大为128Mblock1为128Mblock2为128Mblock3为144M。默认逻辑切片split-size的大小与block-size相适配为128M所以有三个分区。三个分区就会并行的执行3个task任务。
spark中一个executor可以执行多个task任务。这是通过将executor配置为拥有多个cores来实现的。每个核心可以并行执行一个task。即executor是一个JVM进程负责在节点上运行任务。可以为executor配置多个核心来并行处理多个任务。
如果分区数多于executor的核心数某些task必须等待其他task任务完成才能开始执行。