当前位置: 首页 > news >正文

上饶专业的企业网站开发公司番禺网站建设三杰科技

上饶专业的企业网站开发公司,番禺网站建设三杰科技,做织梦网站时图片路径显示错误,网站ico如何添加数据输入 RDD对象 如图可见#xff0c;PySpark支持多种数据的输入#xff0c;在输入完成后#xff0c;都会得到一个#xff1a;RDD类的对象 RDD全称为#xff1a;弹性分布式数据集#xff08;Resilient Distributed Datasets#xff09; PySpark针对数据的处理…数据输入 RDD对象 如图可见PySpark支持多种数据的输入在输入完成后都会得到一个RDD类的对象 RDD全称为弹性分布式数据集Resilient Distributed Datasets PySpark针对数据的处理都是以RDD对象作为载体即 数据存储在RDD内各类数据的计算方法也都是RDD的成员方法RDD的数据计算方法返回值依旧是RDD对象 PySpark的编程模型上图可以归纳为 准备数据到RDD - RDD迭代计算 - RDD导出为list、文本文件等即源数据 - RDD - 结果数据 Python数据容器转RDD对象 PySpark支持通过SparkContext对象的parallelize成员方法将 listtuplesetdictstr 转换为PySpark的RDD对象 注意 字符串会被拆分出1个个的字符存入RDD对象字典仅有key会被存入RDD对象 读取文件转RDD对象 PySpark也支持通过SparkContext入口对象来读取文件来构建出RDD对象。 总结 1. RDD对象是什么为什么要使用它 RDD对象称之为分布式弹性数据集是PySpark中数据计算的载体它可以 提供数据存储提供数据计算的各类方法数据计算的方法返回值依旧是RDDRDD迭代计算 后续对数据进行各类计算都是基于RDD对象进行 2. 如何输入数据到Spark即得到RDD对象 通过SparkContext的parallelize成员方法将Python数据容器转换为RDD对象通过SparkContext的textFile成员方法读取文本文件得到RDD对象 数据计算 map方法 PySpark的数据计算都是基于RDD对象来进行的那么如何进行呢 自然是依赖RDD对象内置丰富的成员方法算子 语法 演示PySpark代码加载数据即数据输入from pyspark import SparkConf, SparkContextconf SparkConf().setMaster(local[*]).setAppName(test_spark) sc SparkContext(confconf)# # 通过parallelize方法将Python对象加载到Spark内成为RDD对象 rdd1 sc.parallelize([1, 2, 3, 4, 5]) rdd2 sc.parallelize((1, 2, 3, 4, 5)) rdd3 sc.parallelize(abcdefg) rdd4 sc.parallelize({1, 2, 3, 4, 5}) rdd5 sc.parallelize({key1: value1, key2: value2, key3: value3})# # 如果要查看RDD里边有什么内容需要用collect方法 print(rdd1.collect()) print(rdd2.collect()) print(rdd3.collect()) print(rdd4.collect()) print(rdd5.collect()) # 用过textFile方法读取文件数据加载到Spark内成为RDD对象 rdd sc.textFile(E:/百度网盘/1、Python快速入门8天零基础入门到精通/资料/第15章资料/资料/hello.txt) print(rdd.collect()) sc.stop() 总结 1. map算子成员方法 接受一个处理函数可用lambda表达式快速编写对RDD内的元素逐个处理并返回一个新的RDD 2. 链式调用 对于返回值是新RDD的算子可以通过链式调用的方式多次调用算子。 flatMap方法 总结 flatMap算子 计算逻辑和map一样可以比map多出解除一层嵌套的功能 reduceByKey方法 PySpark代码加载数据reduceByKey方法 针对KV型 RDD 自动按照key分组,然后根据你提供的聚合逻辑完成组内数(value)的聚合操作.二元元祖 from pyspark import SparkConf, SparkContext # 配置Python解释器 import os os.environ[PYSPARK_PYTHON] D:/Python/Python311/python.execonf SparkConf().setMaster(local[*]).setAppName(test_spark) sc SparkContext(confconf)rdd sc.parallelize([(男, 99), (女, 88),(女,99), (男,77), (男, 55)]) # 需求求男生和女生俩个组的成绩之和 rdd2 rdd.reduceByKey(lambda a, b: a b) print(rdd2.collect())总结 reduceByKey算子 接受一个处理函数对数据进行两两计算 练习案例1 WordCount案例 使用学习到的内容完成 读取文件统计文件内单词的出现数量 hello.txt itheima itheima itcast itheima spark python spark python itheima itheima itcast itcast itheima python python python spark pyspark pyspark itheima python pyspark itcast spark 完成练习案例单词计数统计# 1.构建执行环境入口对象 from pyspark import SparkConf, SparkContext import os os.environ[PYSPARK_PYTHON] D:/Python/Python311/python.exe conf SparkConf().setMaster(local[*]).setAppName(test_spark) sc SparkContext(confconf) # 2.读取数据 rdd sc.textFile(E:/百度网盘/1、Python快速入门8天零基础入门到精通/资料/第15章资料/资料/hello.txt) # 3.取出全部单词 wor_rdd rdd.flatMap(lambda a: a.split( )) # print(wor_rdd.collect()) # 4.将所有单词都转换成二元元组单词为keyValue设置为1 word_with_one_rdd wor_rdd.map(lambda word: (word, 1)) # print(word_with_one_rdd.collect()) # 5.分组并求和 result word_with_one_rdd.reduceByKey(lambda a, b: a b) # 6.打印输出结果 print(result.collect())结果 [(python, 6), (itheima, 7), (itcast, 4), (spark, 4), (pyspark, 3)]filter方法 功能过滤想要的数据进行保留 PySpark代码加载数据Filter方法from pyspark import SparkConf, SparkContext # 配置Python解释器 import osos.environ[PYSPARK_PYTHON] D:/Python/Python311/python.exe conf SparkConf().setMaster(local[*]).setAppName(test_spark) sc SparkContext(confconf)# 准备一个RDD rdd sc.parallelize([1, 2, 3, 4, 5, 6, 7]) # 对RDD的数据进行过滤 rdd2 rdd.filter(lambda num: num % 2 0) # 整数返回true 奇数返回falseprint(rdd2.collect())结果 [2, 4, 6] 总结 filter算子 接受一个处理函数可用lambda快速编写函数对RDD数据逐个处理得到True的保留至返回值的RDD中 distinct方法 功能对RDD数据进行去重返回新的RDD 语法 rdd.distinct       无需传参 PySpark代码加载数据distinct方法 去重 无需传参from pyspark import SparkConf, SparkContext # 配置Python解释器 import osos.environ[PYSPARK_PYTHON] D:/Python/Python311/python.exe conf SparkConf().setMaster(local[*]).setAppName(test_spark) sc SparkContext(confconf)# 准备一个RDD rdd sc.parallelize([1, 1, 2, 3, 3, 3, 5, 6, 7, 7, 7, 7, 7]) # 对RDD的数据进行去重 rdd2 rdd.distinct() print(rdd2.collect())结果 [1, 2, 3, 5, 6, 7]总结 distinct算子 完成对RDD内数据的去重操作 sortBy方法 功能对RDD数据进行排序基于你指定的排序依据 PySpark代码加载数据sortBy方法 排序 语法: rdd.sortBy(func,ascendingFalse, numPartitions1) # func:(T)U:告知按照rdd中的哪个数据进行排序 比如lambda x:x[1]表示按照rdd中的第二列元素进行排序 # ascending True升序 False降序 # numPartitions:用多少分区排序# 1.构建执行环境入口对象 from pyspark import SparkConf, SparkContext import os os.environ[PYSPARK_PYTHON] D:/Python/Python311/python.exe conf SparkConf().setMaster(local[*]).setAppName(test_spark) sc SparkContext(confconf) # 2.读取数据 rdd sc.textFile(E:/百度网盘/1、Python快速入门8天零基础入门到精通/资料/第15章资料/资料/hello.txt) # 3.取出全部单词 wor_rdd rdd.flatMap(lambda a: a.split( )) # 4.将所有单词都转换成二元元组单词为keyValue设置为1 word_with_one_rdd wor_rdd.map(lambda word: (word, 1)) # 5.分组并求和 result word_with_one_rdd.reduceByKey(lambda a, b: a b) # 6.打印输出结果 print(result.collect()) # 7.对结果进行排序 a result.sortBy(lambda x: x[1], ascendingFalse, numPartitions1) # 降序 print(a.collect())b result.sortBy(lambda x: x[1], ascendingTrue, numPartitions1) # 升序 print(b.collect())结果 [(python, 6), (itheima, 7), (itcast, 4), (spark, 4), (pyspark, 3)] [(itheima, 7), (python, 6), (itcast, 4), (spark, 4), (pyspark, 3)] [(pyspark, 3), (itcast, 4), (spark, 4), (python, 6), (itheima, 7)]总结 sortBy算子 接收一个处理函数可用lambda快速编写函数表示用来决定排序的依据可以控制升序或降序全局排序需要设置分区数为1 练习案例2 案例 {id:1,timestamp:2019-05-08T01:03.00Z,category:平板电脑,areaName:北京,money:1450}|{id:2,timestamp:2019-05-08T01:01.00Z,category:手机,areaName:北京,money:1450}|{id:3,timestamp:2019-05-08T01:03.00Z,category:手机,areaName:北京,money:8412} {id:4,timestamp:2019-05-08T05:01.00Z,category:电脑,areaName:上海,money:1513}|{id:5,timestamp:2019-05-08T01:03.00Z,category:家电,areaName:北京,money:1550}|{id:6,timestamp:2019-05-08T01:01.00Z,category:电脑,areaName:杭州,money:1550} {id:7,timestamp:2019-05-08T01:03.00Z,category:电脑,areaName:北京,money:5611}|{id:8,timestamp:2019-05-08T03:01.00Z,category:家电,areaName:北京,money:4410}|{id:9,timestamp:2019-05-08T01:03.00Z,category:家具,areaName:郑州,money:1120} {id:10,timestamp:2019-05-08T01:01.00Z,category:家具,areaName:北京,money:6661}|{id:11,timestamp:2019-05-08T05:03.00Z,category:家具,areaName:杭州,money:1230}|{id:12,timestamp:2019-05-08T01:01.00Z,category:书籍,areaName:北京,money:5550} {id:13,timestamp:2019-05-08T01:03.00Z,category:书籍,areaName:北京,money:5550}|{id:14,timestamp:2019-05-08T01:01.00Z,category:电脑,areaName:北京,money:1261}|{id:15,timestamp:2019-05-08T03:03.00Z,category:电脑,areaName:杭州,money:6660} {id:16,timestamp:2019-05-08T01:01.00Z,category:电脑,areaName:天津,money:6660}|{id:17,timestamp:2019-05-08T01:03.00Z,category:书籍,areaName:北京,money:9000}|{id:18,timestamp:2019-05-08T05:01.00Z,category:书籍,areaName:北京,money:1230} {id:19,timestamp:2019-05-08T01:03.00Z,category:电脑,areaName:杭州,money:5551}|{id:20,timestamp:2019-05-08T01:01.00Z,category:电脑,areaName:北京,money:2450} {id:21,timestamp:2019-05-08T01:03.00Z,category:食品,areaName:北京,money:5520}|{id:22,timestamp:2019-05-08T01:01.00Z,category:食品,areaName:北京,money:6650} {id:23,timestamp:2019-05-08T01:03.00Z,category:服饰,areaName:杭州,money:1240}|{id:24,timestamp:2019-05-08T01:01.00Z,category:食品,areaName:天津,money:5600} {id:25,timestamp:2019-05-08T01:03.00Z,category:食品,areaName:北京,money:7801}|{id:26,timestamp:2019-05-08T01:01.00Z,category:服饰,areaName:北京,money:9000} {id:27,timestamp:2019-05-08T01:03.00Z,category:服饰,areaName:杭州,money:5600}|{id:28,timestamp:2019-05-08T01:01.00Z,category:食品,areaName:北京,money:8000}|{id:29,timestamp:2019-05-08T02:03.00Z,category:服饰,areaName:杭州,money:7000} 需求复制以上内容到文件中使用Spark读取文件进行计算 各个城市销售额排名从大到小全部城市有哪些商品类别在售卖北京市有哪些商品类别在售卖 使用Spark读取文件进行计算 各个城市销售额排名从大到小 全部城市有哪些商品类别在售卖 北京市有哪些商品类别在售卖import json from pyspark import SparkConf, SparkContext import os os.environ[PYSPARK_PYTHON] D:/Python/Python311/python.exe conf SparkConf().setMaster(local[*]).setAppName(test_spark) sc SparkContext(confconf)# TOD0 需求1城市销售额排名 # 1.1 读取文件得到RDD rdd sc.textFile(E:/百度网盘/1、Python快速入门8天零基础入门到精通/资料/第15章资料/资料/orders.txt) # print(rdd.collect())# 1.2取出一个个JSON字符串 json_str rdd.flatMap(lambda a: a.split(|)) # print(json_str.collect())# 1.3将一个个JSON字符串转换为字典 my_dict json_str.map(lambda x: json.loads(x)) # print(my_dict.collect())# 1.4 取出城市和销售额数据 # 城市销售额 city_with_money_rdd my_dict.map(lambda x: (x[areaName], int(x[money]))) # print(city_with_money_rdd.collect())# 1.5 按城市分组按销售额聚合 city_result city_with_money_rdd.reduceByKey(lambda a, b: a b) # print(city_result.collect())# 1.6 按销售额聚合结果进行排序 sorting city_result.sortBy(lambda a: a[1], ascendingFalse, numPartitions1) print(f需求1的结果是{sorting.collect()})# TODD 需求2全部城市有哪些商品类别在售卖 city_with_category_rdd my_dict.map(lambda x: (x[category])).distinct() print(f需求2的结果是{city_with_category_rdd.collect()})# TODD 需求3北京市有哪些商品类别在售卖 # 3.1 过滤北京是市的数据 bj_dict my_dict.filter(lambda a: a[areaName] 北京) # print(bj_dict.collect())# # 3.2 取出全部商品列表 # bj_category bj_dict.map(lambda a: a[category]) # print(bj_category.collect())# # 3.3 进行商品类别去重 # bj_category_distinct bj_category.distinct() # print(f北京市售卖的商品有{bj_category_distinct.collect()})# 3.2 取出全部商品列表 进行商品类别去重 bj_category bj_dict.map(lambda a: a[category]).distinct() print(f北京市售卖的商品有{bj_category.collect()})结果 需求1的结果是[(北京, 91556), (杭州, 28831), (天津, 12260), (上海, 1513), (郑州, 1120)]需求2的结果是[电脑, 家电, 食品, 平板电脑, 手机, 家具, 书籍, 服饰]北京市售卖的商品有[家电, 电脑, 食品, 平板电脑, 手机, 家具, 书籍, 服饰] 数据输出 输出为Python对象 数据输入 sc.parallelizesc.textFile 数据计算 rdd.maprdd.flatMaprdd.reduceByKey... collect算子 功能将rdd各个分区内的数据统一收集到Driver中形成一个List对象 用法 rdd.collect 返回一个List reduce算子 功能对RDD数据集按照你传入的逻辑进行聚合 返回值等同于计算函数的返回值 take算子 功能取RDD的前N个元素组合成List返回给你 用法 sc.parallelize([1, 2, 65, 5, 8, 841, 2, 48, 12, 21, 48]).take(6)结果: [1, 2, 65, 5, 8, 841] count算子 功能计算RDD有多少条数据返回值是一个数字 总结 1. Spark的编程流程就是 将数据加载为RDD数据输入对RDD进行计算数据计算将RDD转换为Python对象数据输出 2. 数据输出的方法 collect将RDD内容转换为listreduce对RDD内容进行自定义聚合take取出RDD的前N个元素组成listcount统计RDD元素个数 数据输出可用的方法是很多的简单的介绍了4个。 输出到文件中 saveAsTextFile算子 功能将RDD的数据写入文本文件中 支持 本地写成hdfs等文件系统 代码 rdd sc.parallelize([1, 2, 3, 4, 5, 6]) rdd.saveAsTextFile(D:/output) 注意事项 调用保存文件的算子需要配置Hadoop依赖 下载Hadoop安装包 解压到电脑任意位置在Python代码中使用os模块配置os.environ[‘HADOOP_HOME’] ‘HADOOP解压文件夹路径’下载winutils.exe并放入Hadoop解压文件夹的bin目录内下载hadoop.dll并放入:C:/Windows/System32 文件夹内 修改rdd分区为1个 方式1SparkConf对象设置属性全局并行度为1 conf SparkConf().setMaster(local[*]).setAppName(test_spark) # 设置spark全局并行度为1 conf.set(spark.default.parallelism, 1)sc SparkContext(confconf) 方式2创建RDD的时候设置parallelize方法传入numSlices参数为1 rdd1 sc.parallelize([1, 2, 3, 4, 5, 6], numSlices1) # 设置分区为1rdd1 sc.parallelize([1, 2, 3, 4, 5, 6], 1) # 设置分区为1 总结 1. RDD输出到文件的方法 rdd.saveAsTextFile(路径)输出的结果是一个文件夹有几个分区就输出多少个结果文件 2. 如何修改RDD分区 SparkConf对象设置conf.set(spark.default.parallelism, 1)创建RDD的时候sc.parallelize方法传入numSlices参数为1
http://www.hkea.cn/news/14355945/

相关文章:

  • 公司网站能否申请国外免费空间网站与域名的关系
  • 做企业网站需要买什么优化模型的推广
  • 重庆哪里做网站网络营销相关理论
  • 四字母net做网站怎么样网站备案要幕布
  • 企业网站功能怎么设计wordpress4.6.9
  • 直播做ppt的网站有哪些沈阳红方城网站建设
  • 深圳企业建网站公司网站建设个人工作室
  • 用织梦做领券网站怎么用织梦做网站后台
  • 网站重定向怎么做万江做网站的公司
  • 企业建站用什么软件厦门海沧区建设局网站
  • 上饶建站公司静态网站建设平台
  • 网站建设公司哪家比较好建设一个网站需要什么技术
  • 个人网站怎么推广专业网站模仿
  • 旅游网站建设经费预算网络及it维护外包
  • 东莞网站推广教程扬中经济
  • 湖北德升建站品牌营销案例分析
  • 有关网站开发的文章网站的国际化 怎么做
  • 幼儿网站模板温州专业手机网站制作哪家便宜
  • 深圳 网站建设设计九口袋网站建设
  • 温州 网站建设公司网站没有备案是不是违法的
  • 校内 实训网站 建设网站类网站开发
  • 佛山网站建设找千界如何避免网站被耍流量
  • 网站分享做描点链接wordpress 跨站调用
  • 自己建设网站不会咋办呀杭州清风室内设计培训学校
  • 新网互联 网站上传修改 wordpress footer
  • 旅游网站开发研究背景国外做直播网站
  • 阿里巴巴网站国际站建设网架公司的螺丝是不是公司安好
  • 做关于时尚网站的目的学生建设网站
  • 佛山网站建设公司哪个性比价好些wordpress可以接广告吗
  • 南宁seo建站廊坊手机模板建站