当前位置: 首页 > news >正文

网站制作网站价格微信开发者模式

网站制作网站价格,微信开发者模式,公司网站域名注册费用,网站技术支持是什么一、RDD入门 1.RDD是什么#xff1f; RDD是一个容错的、只读的、可进行并行操作的数据结构#xff0c;是一个分布在集群各个节点中的存放元素的集合#xff0c;即弹性分布式数据集。 2.RDD的三种创建方式 第一种是将程序中已存在的集合#xff08;如集合、列表、数组 RDD是一个容错的、只读的、可进行并行操作的数据结构是一个分布在集群各个节点中的存放元素的集合即弹性分布式数据集。 2.RDD的三种创建方式 第一种是将程序中已存在的集合如集合、列表、数组转换成RDD。 第二种是读取外部数据集来创建RDD。 def main(args: Array[String]): Unit {//1.入口创建SparkContextval conf new SparkConf().setMaster(local[*]).setAppName(spark)val sc new SparkContext(conf)//2.1准备数据:将程序中已存在的集合如集合、列表、数组转换成RDDval rdd1 sc.parallelize(List(1,2,3,4))val rdd2 sc.makeRDD(List(1,2,3,4))//2.2准备数据读取外部数据集来创建RDDval rdd3 sc.textFile(dataset/words.txt)//3.查看数据rdd1.collect().foreach(println)println(-------------------------)rdd2.collect().foreach(println)println(-------------------------)rdd3.collect().foreach(println)} 第三种是对已有RDD进行转换得到新的RDD在RDD的操作方法中讲解。 二、单个RDD的转换操作 Spark RDD提供了丰富的操作方法函数用于操作分布式的数据集合包括转换操作和行动操作两部分。 转换操作可以将一个RDD转换为一个新的RDD但是转换操作是懒操作不会立刻执行计算行动操作是用于触发转换操作的操作这时才会真正开始进行计算。 1map()方法 作用把 RDD 中的数据 一对一 的转为另一种形式。 格式def map[U: ClassTag](f: T ⇒ U): RDD[U] Map 算子是 原RDD → 新RDD 的过程, 传入函数的参数是原 RDD 数据, 返回值是经过函数转换的新 RDD 的数据。 def main(args: Array[String]): Unit {//1.入口创建SparkContextval conf new SparkConf().setMaster(local[*]).setAppName(spark)val sc new SparkContext(conf)//2.准备数据val rdd1 sc.parallelize(List(1,2,3,4))//3.RDD的转换操作mapvar rdd2 rdd1.map(x x * 10)//4.打印数据rdd2.collect().foreach(println)//10 20 30 40} 2flatMap()方法 作用flatMap 算子和 Map 算子类似, 但是 flatMap 是一对多。 格式def flatMap[U: ClassTag](f: T ⇒ List[U]): RDD[U] 参数是原 RDD 数据, 返回值是经过函数转换的新 RDD 的数据, 需要注意的是返回值是一个集合, 集合中的数据会被展平后再放入新的 RDD。 def main(args: Array[String]): Unit {//1.入口创建SparkContextval conf new SparkConf().setMaster(local[*]).setAppName(spark)val sc new SparkContext(conf)//2.准备数据val rdd1 sc.parallelize(List(How are you,I am fine,What about you))//3.RDD的转换操作flatMapvar rdd2 rdd1.flatMap(x x.split( ))//4.打印数据rdd2.collect().foreach(println) //How are you I am fine What about you}3sortBy()方法 作用用于对标准RDD进行排序有3个可输入参数。 格式def sortBy(func, ascending, numPartitions) 参数func指定按照哪个字段来排序通过这个函数返回要排序的字段scending 是否升序默认是true即升序排序如果需要降序排序那么需要将参数的值设置为false。numPartitions 分区数。 def main(args: Array[String]): Unit {//1.入口创建SparkContextval conf new SparkConf().setMaster(local[*]).setAppName(spark)val sc new SparkContext(conf)//2.准备数据val rdd1 sc.parallelize(List((1,3),(45,2),(7,6)))//3.RDD的转换操作sortBy,按照元组第二个值进行false降序val rdd2 rdd1.sortBy(x x._2,false,1)//4.打印数据 rdd2.collect().foreach(println) //(7,6) (1,3) (45,2)} 4mapPartitionsWithIndex()方法 作用对RDD中的每个分区带有下标进行操作通过自己定义的一个函数来处理。 格式def mapPartitionsWithIndex[U](f: (Int, Iterator[T]) Iterator[U]) 参数f 是函数参数接收两个参数        1Int代表分区号        2Iterator[T]分区中的元素        3返回Iterator[U]操作完后返回的结果 def main(args: Array[String]): Unit {//1.入口创建SparkContextval conf new SparkConf().setMaster(local[*]).setAppName(spark)val sc new SparkContext(conf)//2.准备数据,并设置为2个分区val rdd1 sc.parallelize(List(1,2,3,4,5),2)//3.RDD的转换操作mapPartionsWithIndexval rdd2 rdd1.mapPartitionsWithIndex((index,it){it.toList.map(x[index,x]).iterator})//4.打印数据:【分区编号,数据】rdd2.collect().foreach(println) //[0,1] [0,2] [1,3] [1,4] [1,5]}5filter()方法 作用是一种转换操作用于过滤RDD中的元素。 格式def filter(f: T Boolean): RDD[T] 将返回值为true的元素保留将返回值为false的元素过滤掉最后返回一个存储符合过滤条件的所有元素的新RDD。 def main(args: Array[String]): Unit {//1.入口创建SparkContextval conf new SparkConf().setMaster(local[*]).setAppName(spark)val sc new SparkContext(conf)//2.准备数据val rdd1 sc.parallelize(List(1,2,3,4))//3.RDD的转换操作filter过滤偶数val rdd2 rdd1.filter(x x%20)//4.打印数据rdd2.collect().foreach(println) //2 4}6distinct()方法 作用是一种转换操作用于RDD的数据去重去除两个完全相同的元素没有参数。 格式def distinct(): RDD[T] 将数据集中重复的数据去重,返回一个没有重复元素的新RDD。 def main(args: Array[String]): Unit {//1.入口创建SparkContextval conf new SparkConf().setMaster(local[*]).setAppName(spark)val sc new SparkContext(conf)//2.准备数据val rdd1 sc.parallelize(List(1,1,2,3,3))//3.RDD的转换操作filterval rdd2 rdd1.distinct()//4.打印数据rdd2.collect().foreach(println) //1 2 3} 三、多个RDD的集合操作 1union()方法 作用是一种并集转换操作用于将两个RDD合并成一个不进行去重操作而且两个RDD中每个元素中的值的数据类型需要保持一致。 格式def union(other: RDD[T]): RDD[T] 对源 RDD 和参数 RDD 求并集后返回一个新的 RDD。 def main(args: Array[String]): Unit {//1.入口创建SparkContextval conf new SparkConf().setMaster(local[*]).setAppName(spark)val sc new SparkContext(conf)//2.准备数据val rdd1 sc.parallelize(List(1,2,3,4))val rdd2 sc.parallelize(List(1,3,5,6))//3.RDD的转换操作unionval rdd3 rdd1.union(rdd2)//4.打印数据rdd3.collect().foreach(println) //1,2,3,4,1,3,5,6} 2intersection()方法 作用是一种交集转换操作用于将求出两个RDD的共同元素。 格式def intersection(other: RDD[T]): RDD[T] 对源 RDD 和参数 RDD 求交集后返回一个新的 RDD两个RDD的顺序不会影响结果。 def main(args: Array[String]): Unit {//1.入口创建SparkContextval conf new SparkConf().setMaster(local[*]).setAppName(spark)val sc new SparkContext(conf)//2.准备数据val rdd1 sc.parallelize(List(1,2,3,4))val rdd2 sc.parallelize(List(1,2))//3.RDD的转换操作intersectionval rdd3 rdd1.intersection(rdd2)//4.打印数据rdd3.collect().foreach(println) //1,2} 3subtract()方法 作用是一种补集转换操作用于将前一个RDD中在后一个RDD出现的元素删除返回值为前一个RDD去除与后一个RDD相同元素后的剩余值所组成的新的RDD。 格式def subtract(other: RDD[T]): RDD[T] 将原RDD里和参数RDD里相同的元素去掉后返回一个新的 RDD两个RDD的顺序会影响结果。 def main(args: Array[String]): Unit {//1.入口创建SparkContextval conf new SparkConf().setMaster(local[*]).setAppName(spark)val sc new SparkContext(conf)//2.准备数据val rdd1 sc.parallelize(List(1,2,3,4))val rdd2 sc.parallelize(List(1,2,5))//3.RDD的转换操作subtractval rdd3 rdd1.subtract(rdd2)val rdd4 rdd2.subtract(rdd1)//4.打印数据rdd3.collect().foreach(println) //3,4println(----------------------)rdd4.collect().foreach(println) //5} 4cartesian()方法 作用是一种求笛卡儿积操作用于将两个集合的元素两两组合成一组。 格式def cartesian(other: RDD[T]): RDD[T] 将原RDD里的每个元素都和参数RDD里的每个组合成一组返回一个新的RDD。两个RDD的顺序会影响结果。 def main(args: Array[String]): Unit {//1.入口创建SparkContextval conf new SparkConf().setMaster(local[*]).setAppName(spark)val sc new SparkContext(conf)//2.准备数据val rdd1 sc.parallelize(List(1,3,4))val rdd2 sc.parallelize(List(1,2))//3.RDD的转换操作cartesianval rdd3 rdd1.cartesian(rdd2)val rdd4 rdd2.cartesian(rdd1)//4.打印数据rdd3.collect().foreach(println) //(1,1) (1,2) (3,1) (3,2) (4,1) (4,2)println(----------------------)rdd4.collect().foreach(println) //(1,1) (1,3) (1,4) (2,1) (2,3) (2,4)} 四、单个键值对RDD的转换操作 Spark的大部分RDD操作都支持所有种类的单值RDD但是有少部分特殊的操作只能作用于键值对类型的RDD。键值对RDD由一组组的键值对组成这些RDD被称为PairRDD。PairRDD提供了并行操作各个键或跨节点重新进行数据分组的操作接口。 1创建键值对RDD 1.将一个普通RDD通过map转化为Pair RDD。当需要将一个普通的RDD转化为一个PairRDD时可以使用map函数来进行操作传递的函数需要返回键值对。 2.通过List直接创建Pair RDD。 3.使用zip()方法用于将两个RDD组成Pair RDD。要求两个及元素数量相同否则会抛出异常。  def main(args: Array[String]): Unit {//1.入口创建SparkContextval conf new SparkConf().setMaster(local[*]).setAppName(spark)val sc new SparkContext(conf)//2.准备数据val rdd1 sc.parallelize(List(I like spark,He likes spark))//3.1 通过flatMap和map方法将一个普通的RDD转化为一个键值对RDDval rdd2 rdd1.flatMap(x x.split( ))val rdd3 rdd2.map(x (x,1))//3.2 通过List直接创建Pair RDDval rdd4 sc.parallelize(List((张三,100),(李四,90),(王五,80)))//3.3 使用zip()方法用于将两个RDD组成Pair RDDval dataRdd1 sc.parallelize(List(1,2,3),2)val dataRdd2 sc.parallelize(List(A,B,C),2)val dataRdd3 dataRdd1.zip(dataRdd2)//4.打印数据rdd2.collect().foreach(println) //(I,like,spark,He,likes,spark)println(---------------------)rdd3.collect().foreach(println) //(I,1) (like,1) (spark,1) (He,1) (likes,1) (spark,1)println(---------------------)rdd4.collect().foreach(println) //(张三,100) (李四,90) (王五,80)println(---------------------)dataRdd3.collect().foreach(println) //(1,A) (2,B) (3,C)} 2键值对RDD的keys和values方法 键值对RDD包含键和值两个部分。 Spark提供了两种方法分别获取键值对RDD的键和值。 keys方法返回一个仅包含键的RDD。values方法返回一个仅包含值的RDD。 def main(args: Array[String]): Unit {//1.入口创建SparkContextval conf new SparkConf().setMaster(local[*]).setAppName(spark)val sc new SparkContext(conf)//2.准备数据val rdd1 sc.parallelize(List((张三,100),(李四,90),(王五,80)))//3.1 获取keysval rdd2 rdd1.keys//3.2 获取valuesval rdd3 rdd1.values//3.打印数据rdd2.collect().foreach(println) //张三 李四 王五println(---------------------)rdd3.collect().foreach(println) //100 90 80} 3键值对RDD的reduceByKey() 作用将相同键的前两个值传给输入函数产生一个新的返回值新产生的返回值与RDD中相同键的下一个值组成两个元素再传给输入函数直到最后每个键只有一个对应的值为止。 格式def reduceByKey(func: (V, V) V): RDD[(K, V)] 可以将数据按照相同的 Key 对 Value 进行聚合。 def main(args: Array[String]): Unit {//1.入口创建SparkContextval conf new SparkConf().setMaster(local[*]).setAppName(spark)val sc new SparkContext(conf)//2.准备数据val rdd1 sc.parallelize(List(I like spark,He likes spark))//3.1 通过flatMap和map方法将一个普通的RDD转化为一个键值对RDDval rdd2 rdd1.flatMap(x x.split( ))val rdd3 rdd2.map(x (x,1)) // (I,1) (like,1) (spark,1) (He,1) (likes,1) (spark,1)//3.2 使用reduceByKey将相同键的值进行相加统计词频val rdd4 rdd3.reduceByKey((a,b) ab)//4.打印数据rdd4.collect().foreach(println) //(I,1) (He,1) (spark,2) (like,1) (likes,1)} 3键值对RDD的groupByKey() 作用按照 Key 分组, 和 reduceByKey 有点类似, 但是 groupByKey 并不求聚合只是列举 Key 对应的所有 Value。 格式def groupByKey(): RDD[(K, Iterable[V])] 对于一个由类型K的键和类型V的值组成的RDD通过groupByKey()方法得到的RDD类型是[K,Iterable[V]]可以将数据源的数据根据 key 对 value 进行分组。 def main(args: Array[String]): Unit {//1.入口创建SparkContextval conf new SparkConf().setMaster(local[*]).setAppName(spark)val sc new SparkContext(conf)//2.准备数据val rdd1 sc.parallelize(List((a,1),(a,2),(b,1),(c,1),(c,1)))//3.使用groupByKey进行分组val rdd2 rdd1.groupByKey()//4.使用map方法查看分组后每个分组中的值的数量val rdd3 rdd2.map(x (x._1,x._2.size))//5.打印数据rdd2.collect().foreach(println) //(a,CompactBuffer(1, 2)) (b,CompactBuffer(1)) (c,CompactBuffer(1, 1))rdd3.collect().foreach(println) //(a,2) (b,1) (c,2)} 五、多个键值对RDD的转换操作 在Spark中键值对RDD提供了很多基于多个RDD的键进行操作的方法。 1join()方法 作用用于根据键对两个RDD进行内连接将两个RDD中键相同的数据的值存放在一个元组中最后只返回两个RDD中都存在的键的连接结果。 格式def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] 在类型为(K,V)和(K,W)的 RDD 上调用返回一个相同 key 对应的所有元素连接在一起的。 def main(args: Array[String]): Unit {//1.入口创建SparkContextval conf new SparkConf().setMaster(local[*]).setAppName(spark)val sc new SparkContext(conf)//2.准备数据val rdd1 sc.parallelize(List((a,1),(b,2),(c,3)))val rdd2 sc.parallelize(List((a,2),(b,4),(e,5)))//3.使用join进行内连接val rdd3 rdd1.join(rdd2)//4.打印数据rdd3.collect().foreach(println) //(a,(1,2)) (b,(2,4))} 2rightOuterJoin()方法 作用用于根据键对两个RDD进行右外连接连接结果是右边RDD的所有键的连接结果不管这些键在左边RDD中是否存在。 格式def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] 类似于 SQL 语句的右外连接。如果在左边RDD中有对应的键那么连接结果中值显示为Some类型值如果没有那么显示为None值。 3leftOuterJoin()方法 作用用于根据键对两个RDD进行左外连接连接结果是左边RDD的所有键的连接结果不管这些键在右边RDD中是否存在。 格式def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] 类似于 SQL 语句的右外连接。如果在右边RDD中有对应的键那么连接结果中值显示为Some类型值如果没有那么显示为None值。 4fullOuterJoin()方法 作用用于对两个RDD进行全外连接保留两个RDD中所有键的连接结果。 格式def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] 类似于 SQL 语句的全外连接。 def main(args: Array[String]): Unit {//1.入口创建SparkContextval conf new SparkConf().setMaster(local[*]).setAppName(spark)val sc new SparkContext(conf)//2.准备数据val rdd1 sc.parallelize(List((a,1),(b,2),(c,3)))val rdd2 sc.parallelize(List((a,2),(b,4),(e,5)))//3.使用rightOuterJoin进行右外连接val rdd3 rdd1.rightOuterJoin(rdd2)//4.使用leftOuterJoin进行左外连接val rdd4 rdd1.leftOuterJoin(rdd2)//5.使用fullOuterJoin进行全外连接val rdd5 rdd1.fullOuterJoin(rdd2)//6.打印数据rdd3.collect().foreach(println) //(a,(Some(1),2)) (b,(Some(2),4)) (e,(None,5))println(--------------------)rdd4.collect().foreach(println) //(a,(1,Some(2))) (b,(2,Some(4))) (c,(3,None))println(--------------------)rdd5.collect().foreach(println) //(a,(Some(1),Some(2))) (b,(Some(2),Some(4))) (c,(Some(3),None)) (e,(None,Some(5)))} 5sortByKey()方法 作用作用于Key-Value形式的RDD并对Key进行排序。 格式def sortByKey(ascending: Boolean true, numPartitions: Int self.partitions.length)  : RDD[(K, V)] 参数scending 是否升序默认是true即升序排序如果需要降序排序那么需要将参数的值设置为false。numPartitions 为分区数。 def main(args: Array[String]): Unit {//1.入口创建SparkContextval conf new SparkConf().setMaster(local[*]).setAppName(spark)val sc new SparkContext(conf)//2.准备数据val rdd1 sc.parallelize(List((1,3),(45,2),(7,6)))//3.RDD的转换操作sortByKeyval rdd2 rdd1.sortByKey(true) //按key进行升序val rdd3 rdd1.sortByKey(false) //按key进行降序//4.打印数据rdd2.collect().foreach(println)println(---------------------)rdd3.collect().foreach(println)} 6lookup()方法 作用作用于键值对RDD返回指定键的所有值。 格式def lookup(key : K) : scala.Seq[V] 作用于K-V类型的RDD上返回指定K的所有V值 def main(args: Array[String]): Unit {//1.入口创建SparkContextval conf new SparkConf().setMaster(local[*]).setAppName(spark)val sc new SparkContext(conf)//2.准备数据val rdd1 sc.parallelize(List((张三,100),(李四,90),(王五,80)))//3.使用lookup()方法查找指定键的值val result rdd1.lookup(李四)//4.打印数据println(result) //WrappedArray(90)} 7combineByKey()方法 作用用于将键相同的数据聚合并且允许返回类型与输入数据的类型不同的返回值。 格式def combineByKey[C](                  createCombiner: V C,                  mergeValue: (C, V) C,                  mergeCombiners: (C, C) C): RDD[(K, C)] combineByKey()方法接收3个重要的参数具体说明如下 createCombiner:VCV是键值对RDD中的值部分将该值转换为另一种类型的值CC会作为每一个键的累加器的初始值。mergeValue:(C,V)C该函数将元素V聚合到之前的元素C(createCombiner)上这个操作在每个分区内进行。mergeCombiners:(C,C)C该函数将两个元素C进行合并这个操作在不同分区间进行)。 小练习将数据 List((zhangsan, 99.0), (zhangsan, 96.0), (lisi, 97.0), (lisi, 98.0), (zhangsan, 97.0))求每个 key的平均值。 def main(args: Array[String]): Unit {//1.入口创建SparkContextval conf new SparkConf().setMaster(local[*]).setAppName(spark)val sc new SparkContext(conf)//需求将数据 List((zhangsan, 99.0), (zhangsan, 96.0), (lisi, 97.0), (lisi, 98.0), (zhangsan, 97.0))求每个 key的平均值//2.准备数据val rdd1 sc.parallelize(List((zhangsan, 99.0), (zhangsan, 96.0), (lisi, 97.0), (lisi, 98.0), (zhangsan, 97.0)))//3.1 通过combineByKey方法将RDD安装key进行聚合返回值形式key,(值总和,key个数))val rdd2 rdd1.combineByKey(score (score,1),(scoreCount:(Double,Int),newScore:Double) (scoreCount._1newScore,scoreCount._21),(scoreCount1:(Double,Int),scoreCount2:(Double,Int)) (scoreCount1._1scoreCount2._1,scoreCount1._2scoreCount2._2))//打印combineByKey聚合之后的数据,形式key,(值总和,key个数))rdd2.collect().foreach(println) //(zhangsan,(292.0,3)) (lisi,(195.0,2))//3.2 将按值聚合相加后的结果(zhangsan,(292.0,3)) (lisi,(195.0,2))求每个人的平均值返回值形式key,平均值val result rdd2.map(item (item._1,item._2._1/item._2._2))//4 打印数据println(------------------)result.collect().foreach(println)} combineByKey()方法执行过程的图解
http://www.hkea.cn/news/14454848/

相关文章:

  • 做效果图的外包网站百度网盘网页
  • 中国文明网联盟网站建设嘉兴网站建设全包
  • 个人怎么做购物网站女程序员可以干到多少岁
  • 长沙seo网站排名优化公司学做软件的网站有哪些
  • 哪个网站推广比较好中国推广网
  • 来宾绍兴seo网站托管方案如何做网站网页流程
  • 设计素材网站服装舆情分析师需要具备哪些技能
  • 制作公司网站 价格红酒企业网站建设
  • 南京做网站制作公司湘潭网站建设 水平磐石网络
  • 网站左侧导航源码建筑公司排名前100强
  • 太原深圳建设工程信息网站医保局微网站开发
  • 个人网站设计报告书园区建设网站的方案
  • 宝山网站建设宝山开发公司管理软件
  • 网站内容需要备案吗sousou提交网站入口
  • 网站开发eq编辑器vr全景网站开发制作
  • 优质的专业网站建设平湖市规划建设局网站
  • 网站网页设计中怎么添加页码信息企业网站制作建站公司
  • 怎么做一个网站app吗广告策划书范本
  • 厦门响应式网站建设安卓市场wordpress主题
  • 装修类网站模板下载wordpress用什么开发工具
  • 上市公司查询网站做视频网站的空间
  • 石家庄网站建设求职简历apmserv安装wordpress
  • 选网站建设公司有什么注意的福田瑞沃自卸车官网
  • c 网站开发入门视频react网站开发
  • 网站建设学习多少钱wordpress主题网站
  • 免费源码资源源码站go企业网站黄页怎么做
  • 建网站资料网站开发流行
  • 制作英文网站多少钱手机版百度一下
  • 专门做电子书的网站品牌画册设计公司
  • 怎样利用云盘做电影网站衡阳专业的关键词优化终报价