有哪些行业需要做网站建设和推广,沈阳医大男科怎么样,网站备案教程,做网站的语言都有什么序列化
闭包检查
序列化方法和属性
依赖关系
RDD 血缘关系
RDD 窄依赖
RDD 宽依赖
RDD 任务划分
RDD 持久化
RDD Cache 缓存
RDD CheckPoint 检查点
缓存和检查点区别 序列化
闭包检查 从计算的角度, 算子以外的代码都是在 Driver 端执行, 算子里面的代码都是在 E…
序列化
闭包检查
序列化方法和属性
依赖关系
RDD 血缘关系
RDD 窄依赖
RDD 宽依赖
RDD 任务划分
RDD 持久化
RDD Cache 缓存
RDD CheckPoint 检查点
缓存和检查点区别 序列化
闭包检查 从计算的角度, 算子以外的代码都是在 Driver 端执行, 算子里面的代码都是在 Executor 端执行。那么在 scala 的函数式编程中就会导致算子内经常会用到算子外的数据这样就 形成了闭包的效果如果使用的算子外的数据无法序列化就意味着无法传值给 Executor 端执行就会发生错误所以需要在执行任务计算前检测闭包内的对象是否可以进行序列 化这个操作我们称之为闭包检测。Scala2.12 版本后闭包编译方式发生了改变
序列化方法和属性 从计算的角度, 算子以外的代码都是在 Driver 端执行, 算子里面的代码都是在 Executor 端执行。
object spark_02 {def main(args: Array[String]): Unit {//准备环境//*代表线程的核数 应用程序名称RDDval sparkConf new SparkConf().setMaster(local[*]).setAppName(RDD)val sc new SparkContext(sparkConf)val rdd: RDD[String] sc.makeRDD(Array(hello world, hello spark, hive, atguigu))//创建查询对象val search new Search(h)//函数传递打印ERROR Task not serializablesearch.getMatch1(rdd).collect().foreach(println)println()//属性传递打印ERROR Task not serializablesearch.getMatch2(rdd).collect().foreach(println)//关闭环境sc.stop()}
}//查询对象
//类的构造参数是类的属性构造参数需要进行闭包检查对类进行闭包检查
class Search(query:String) extends Serializable {def isMatch(s: String): Boolean {s.contains(query)}// 函数序列化案例def getMatch1 (rdd: RDD[String]): RDD[String] {rdd.filter(isMatch)}// 属性序列化案例def getMatch2(rdd: RDD[String]): RDD[String] {rdd.filter(x x.contains(query))}
} 依赖关系 相邻的两个RDD关系称之为依赖关系
RDD 血缘关系 多个连续的RDD的依赖关系称之为血缘关系 RDD 只支持粗粒度转换即在大量记录上执行的单个操作。将创建 RDD 的一系列 Lineage 血统记录下来以便恢复丢失的分区。RDD 的 Lineage 会记录 RDD 的元数据信息和转 换行为当该 RDD 的部分分区数据丢失时它可以根据这些信息来重新运算和恢复丢失的 数据分区。
val fileRDD: RDD[String] sc.textFile(input/1.txt)
println(fileRDD.toDebugString) //打印输出血缘关系
println(----------------------)
val wordRDD: RDD[String] fileRDD.flatMap(_.split( ))
println(wordRDD.toDebugString)
println(----------------------)
val mapRDD: RDD[(String, Int)] wordRDD.map((_,1))
println(mapRDD.toDebugString)
println(----------------------)
val resultRDD: RDD[(String, Int)] mapRDD.reduceByKey(__)
println(resultRDD.toDebugString)
resultRDD.collect()
RDD 窄依赖 窄依赖表示每一个父(上游)RDD 的 Partition 最多被子下游RDD 的一个 Partition 使用 窄依赖我们形象的比喻为独生子女。
RDD 宽依赖 宽依赖表示同一个父上游RDD 的 Partition 被多个子下游RDD 的 Partition 依赖会 引起 Shuffle总结宽依赖我们形象的比喻为多生。
RDD 任务划分 RDD 任务切分中间分为Application、Job、Stage 和 Task
Application初始化一个 SparkContext 即生成一个 ApplicationJob一个 Action 算子就会生成一个 JobStageStage 等于宽依赖(ShuffleDependency)的个数加 1Task一个 Stage 阶段中最后一个 RDD 的分区个数就是 Task 的个数。
注意Application-Job-Stage-Task 每一层都是 1 对 n 的关系。 RDD 持久化
RDD Cache 缓存 RDD 通过 Cache 或者 Persist 方法将前面的计算结果缓存默认情况下会把数据以缓存 在 JVM 的堆内存中。但是并不是这两个方法被调用时立即缓存而是触发后面的 action 算 子时该 RDD 将会被缓存在计算节点的内存中并供后面重用。
// cache 操作会增加血缘关系不改变原有的血缘关系
println(wordToOneRdd.toDebugString)
// 数据缓存。
wordToOneRdd.cache()
// 可以更改存储级别
//mapRdd.persist(StorageLevel.MEMORY_AND_DISK_2)缓存有可能丢失或者存储于内存的数据由于内存不足而被删除RDD 的缓存容错机 制保证了即使缓存丢失也能保证计算的正确执行。通过基于 RDD 的一系列转换丢失的数 据会被重算由于 RDD 的各个 Partition 是相对独立的因此只需要计算丢失的部分即可 并不需要重算全部 Partition。 Spark 会自动对一些 Shuffle 操作的中间数据做持久化操作(比如reduceByKey)。这样 做的目的是为了当一个节点 Shuffle 失败了避免重新计算整个输入。但是在实际使用的时 候如果想重用数据仍然建议调用 persist 或 cache。
RDD CheckPoint 检查点 所谓的检查点其实就是通过将 RDD 中间结果写入磁盘 由于血缘依赖过长会造成容错成本过高这样就不如在中间阶段做检查点容错如果检查点 之后有节点出现问题可以从检查点开始重做血缘减少了开销。 对 RDD 进行 checkpoint 操作并不会马上被执行必须执行 Action 操作才能触发。
// 设置检查点路径
sc.setCheckpointDir(./checkpoint1)
// 创建一个 RDD读取指定位置文件:hello atguigu atguigu
val lineRdd: RDD[String] sc.textFile(input/1.txt)
// 业务逻辑
val wordRdd: RDD[String] lineRdd.flatMap(line line.split( ))
val wordToOneRdd: RDD[(String, Long)] wordRdd.map {word {(word, System.currentTimeMillis())}
}
// 增加缓存,避免再重新跑一个 job 做 checkpoint
wordToOneRdd.cache()
// 数据检查点针对 wordToOneRdd 做检查点计算
wordToOneRdd.checkpoint()
// 触发执行逻辑
wordToOneRdd.collect().foreach(println)缓存和检查点区别
Cache 缓存只是将数据保存起来不切断血缘依赖。Checkpoint 检查点切断血缘依赖。Cache 缓存的数据通常存储在磁盘、内存等地方可靠性低。Checkpoint 的数据通常存 储在 HDFS 等容错、高可用的文件系统可靠性高。建议对 checkpoint()的 RDD 使用 Cache 缓存这样 checkpoint 的 job 只需从 Cache 缓存 中读取数据即可否则需要再从头计算一次 RDD。