乡村建设规划网站,wordpress抖音,公司代运营,亚马逊品牌官网建设键值对RDD#xff08;Pair RDD#xff09;是指每个RDD元素都是#xff08;key#xff0c;value#xff09;键值对类型#xff0c;是一种常见的RDD类型#xff0c;可以应用于很多的应用场景。
一、 键值对RDD的创建
键值对RDD的创建主要有两种方式#xff1a; #x…键值对RDDPair RDD是指每个RDD元素都是keyvalue键值对类型是一种常见的RDD类型可以应用于很多的应用场景。
一、 键值对RDD的创建
键值对RDD的创建主要有两种方式 1从文件中加载生成RDD 2通过并行集合数组创建RDD。
1从文件中加载生成RDD
首先使用textFile()方法从文件中加载数据然后使用map()函数转换得到相应的键值对RDD。
scala val lines sc.textFile(file:///usr/local/spark/mycode/pairrdd/word.txt)
lines: org.apache.spark.rdd.RDD[String] file:///usr/local/spark/mycode/pairrdd/ word.txtMapPartitionsRDD[1] at textFile at console:27
scala val pairRDD lines.flatMap(line line.split( )).map(word (word,1)) pairRDD: org.apache.spark.rdd.RDD[(String, Int)] MapPartitionsRDD[3] at map at console:29
scala pairRDD.foreach(println)
(i,1)
(love,1)
(hadoop,1)
…… map(word (word,1))函数的作用是取出RDD中的每个元素也就是每个单词赋值给word然后把word转换成(word,1)的键值对形式。
2通过并行集合数组创建RDD
scala val list List(Hadoop,Spark,Hive,Spark)
list: List[String] List(Hadoop, Spark, Hive, Spark) scala val rdd sc.parallelize(list) rdd: org.apache.spark.rdd.RDD[String] ParallelCollectionRDD[11] at parallelize at console:29
scala val pairRDD rdd.map(word (word,1)) pairRDD: org.apache.spark.rdd.RDD[(String, Int)] MapPartitionsRDD[12] at map at console:31
scala pairRDD.foreach(println)
(Hadoop,1)
(Spark,1)
(Hive,1)
(Spark,1)二、常用的键值对转换操作
常用的键值对转换操作包括reduceByKey(func)、groupByKey()、keys、values、sortByKey()、mapValues(func)、join和combineByKey等。
1reduceByKey(func)
reduceByKey(func)的功能是使用func函数合并具有相同键的值。 有一个键值对RDD包含4个元素分别是(“Hadoop”,1)、(“Spark”,1)、(“Hive”,1)和(“Spark”,1)。可以使用reduceByKey()操作得到每个单词的出现次数代码及其执行结果如下
scala pairRDD.reduceByKey((a,b)ab).foreach(println)
(Spark,2)
(Hive,1)
(Hadoop,1)2·groupByKey()
groupByKey()的功能是对具有相同键的值进行分组。 有四个键值对(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5)采用groupByKey()后得到的结果是(“spark”,(1,2))和(“hadoop”,(3,5))代码及其执行结果如下
scala pairRDD.groupByKey()
res15: org.apache.spark.rdd.RDD[(String, Iterable[Int])] ShuffledRDD[15] at groupByKeyat console:34reduceByKey和groupByKey的区别是reduceByKey用于对每个key对应的多个value进行聚合操作并且聚合操作可以通过函数func进行自定义groupByKey也是对每个key进行操作但是对每个key只会生成一个value-listgroupByKey本身不能自定义函数需要先用groupByKey生成RDD然后才能对此RDD通过map进行自定义函数操作。
3keys()
键值对RDD每个元素都是(key,value)的形式keys操作只会把键值对RDD中的key返回形成一个新的RDD。
有一个键值对RDD名称为pairRDD包含4个元素分别是(“Hadoop”,1)、(“Spark”,1)、(“Hive”,1)和(“Spark”,1)可以使用keys方法取出所有的key并打印出来代码及其执行结果如下
scala pairRDD.keys
res17: org.apache.spark.rdd.RDD[String] MapPartitionsRDD[17] at keys at console:34
scala pairRDD.keys.foreach(println)
Hadoop
Spark
Hive
Spark4values()
values操作只会把键值对RDD中的value返回形成一个新的RDD。
有一个键值对RDD名称为pairRDD包含4个元素分别是(“Hadoop”,1)、(“Spark”,1)、(“Hive”,1)和(“Spark”,1)可以使用values方法取出所有的value并打印出来代码及其执行结果如下
scala pairRDD.values
res0: org.apache.spark.rdd.RDD[Int] MapPartitionsRDD[2] at values at console:34
scala pairRDD.values.foreach(println)
1
1
1
15sortByKey()
sortByKey()的功能是返回一个根据key排序的RDD。
有一个键值对RDD名称为pairRDD包含4个元素分别是(“Hadoop”,1)、(“Spark”,1)、(“Hive”,1)和(“Spark”,1)使用sortByKey()的效果如下
scala pairRDD.sortByKey()
res0: org.apache.spark.rdd.RDD[(String, Int)] ShuffledRDD[2] at sortByKey at console:34
scala pairRDD.sortByKey().foreach(println)
(Hadoop,1)
(Hive,1)
(Spark,1)
(Spark,1)6sortBy()
sortByKey()的功能是返回一个根据key排序的RDD而sortBy()则可以根据其他字段进行排序。
scala val d1 sc.parallelize(Array((c,8),(b,25),(c,17),(a,42),(b,4),(d,9),(e,17),(c,2),(f,29),(g,21),(b,9)))
scala d1.reduceByKey(__).sortByKey(false).collect res2: Array[(String, Int)] Array((g,21),(f,29),(e,17),(d,9),(c,27),(b,38),(a,42))sortByKey(false)括号中的参数false表示按照降序排序如果没有提供参数false则默认采用升序排序。从上面排序后的效果可以看出所有键值对都按照key的降序进行了排序因此输出Array((g,21),(f,29),(e,17),(d,9),(c,27),(b,38),(a,42))。
7mapValues(func)
mapValues(func)对键值对RDD中的每个value都应用一个函数但是key不会发生变化。 有一个键值对RDD名称为pairRDD包含4个元素分别是(“Hadoop”,1)、(“Spark”,1)、(“Hive”,1)和(“Spark”,1)下面使用mapValues()操作把所有RDD元素的value都增加1
scala pairRDD.mapValues(x x1)res2: org.apache.spark.rdd.RDD[(String, Int)] MapPartitionsRDD[4] at mapValues at console:34 scala pairRDD.mapValues(x x1).foreach(println) (Hadoop,2) (Spark,2) (Hive,2) (Spark,2)8join()
join表示内连接对于给定的两个输入数据集(K,V1)和(K,V2)只有在两个数据集中都存在的key才会被输出最终得到一个(K,(V1,V2))类型的数据集。
scala val pairRDD1 sc.| parallelize(Array((spark,1),(spark,2),(hadoop,3),(hadoop,5)))
scala val pairRDD2 sc.parallelize(Array((spark,fast)))
scala pairRDD1.join(pairRDD2)
scala pairRDD1.join(pairRDD2).foreach(println)
(spark,(1,fast))
(spark,(2,fast))pairRDD1中的键值对(“spark”,1)和pairRDD2中的键值对(“spark”,“fast”)因为二者具有相同的key即spark所以会产生连接结果(“spark”,(1,“fast”))。
9combineByKey()
combineByKey(createCombiner,mergeValue,mergeCombiners,partitioner,mapSideCombine)中的各个参数的含义如下 1createCombiner在第一次遇到key时创建组合器函数将RDD数据集中的V类型值转换C类型值V C 2mergeValue合并值函数再次遇到相同的Key时将createCombiner的C类型值与这次传入的V类型值合并成一个C类型值C,VC 3mergeCombiners合并组合器函数将C类型值两两合并成一个C类型值 4partitioner使用已有的或自定义的分区函数默认是HashPartitioner 5mapSideCombine是否在map端进行Combine操作默认为true。 文章来源《Spark编程基础》 作者林子雨 文章内容仅供学习交流如有侵犯联系删除哦