一个超链接 多个网站,湘潭培训网站建设,北京王府井大楼,化州市建设局网站半山腰太挤了#xff0c;你该去山顶看看 —— 24.11.10 一、输出为python对象
1.collect算子
功能:
将RDD各个分区内的数据#xff0c;统一收集到Driver中#xff0c;形成一个List对象
语法#xff1a;
rdd.collect()
返回值是一个list列表
示例#xff1a;
from … 半山腰太挤了你该去山顶看看 —— 24.11.10 一、输出为python对象
1.collect算子
功能:
将RDD各个分区内的数据统一收集到Driver中形成一个List对象
语法
rdd.collect()
返回值是一个list列表
示例
from pyspark import SparkConf,SparkContext
import osconf SparkConf().setMaster(local).setAppName(test_spark)
os.environ[PYSPARK_PYTHON] E:/python.learning/pyt/scripts/python.exe
sc SparkContext(conf conf)Set {小明,小红,小强}
Tuple (小明,小红,小强)set_rdd sc.parallelize(Set)
tuple_rdd sc.parallelize(Tuple)print(set_rdd.collect())
print(tuple_rdd.collect()) 2.reduce算子
功能:
对RDD数据集按照你传入的逻辑进行聚合
语法:
rdd.reduce(func)rdd sc.parallelize(range(1 , 10))
# 将rdd的数据进行累加求和
print(rdd.reduce(lambda a , b : a b))
返回值等同于计算函数的返回值
示例
from pyspark import SparkContext,SparkConf
import os
import jsonos.environ[PYSPARK_PYTHON] E:/python.learning/pyt/scripts/python.execonf SparkConf().setMaster(local).setAppName(test_spark)
sc SparkContext(conf conf)List [1,2,3,4,5,6,7,8,9]
rdd sc.parallelize(List)
print(rdd.reduce(lambda x, y : x y))3.take算子
功能
取RDD的前N个元素组合成list返回
语法
sc.parallelize([3,2,1,4,5,6]).take(5) # [32145] 返回前n个元素组成的list
示例
from pyspark import SparkContext,SparkConf
import os
import jsonos.environ[PYSPARK_PYTHON] E:/python.learning/pyt/scripts/python.exe
conf SparkConf().setMaster(local[*]).setAppName(test_spark)
sc SparkContext(confconf)
List (1,2,3,4,5,6,7,8,9)
rdd sc.parallelize(List)
res rdd.take(4)
print(前四个元素为res) 4.count算子
功能
计算RDD有多少条数据
语法
sc.parallelize([3,2,1,4,5,6]).count()
返回值是一个数字
示例
from pyspark import SparkConf,SparkContext
import os
import jsonos.environ[PYSPARK_PYTHON] E:/python.learning/pyt/scripts/python.exe
conf SparkConf().setMaster(local[*]).setAppName(test_spark)
sc SparkContext(confconf)rdd sc.parallelize([yyh,hl,grq,zxj,cby,wfe,mrr,qjy])
print(rdd.count()) 二、输出到文件中
1.saveAsTextFile算子
功能
将RDD的数据写入文本文件中
支持本地写出、 hdfs等文件系统
语法
rdd sc.parallelize([12345])
rdd.saveAsTextFile(../data/output/test.txt)
2.配置Hadoop相关依赖
调用保存文件的算子需要配置Hadoop依赖
① 下载Hadoop安装包
http://archive.apache.org/dist/hadoop/common/hadoop-3.0.0/hadoop-3.0.0.tar.gz
② 解压到电脑任意位置 ③ 在Python代码中使用os模块配置:
os.environ[HADOOP HOME]HADOOP解压文件夹路径
E:\python.learning\hadoop分布式相关\hadoop-3.0.0
④ 下载winutils.exe,并放入Hadoop解压文件夹的bin目录内
https://raw.githubusercontent.com/steveloughran/winutils/master/hadoop-3.0.0/bin/winutils.exe ⑤ 下载hadoop.dll,并放入:C:/Windows/System32 文件夹内
https://raw.githubusercontent.com/steveloughran/winutils/master/hadoop-3.0.0/bin/hadoop.dll 3.代码示例
from pyspark import SparkConf,SparkContext
import osconf SparkConf().setMaster(local).setAppName(test_spark)
os.environ[PYSPARK_PYTHON] E:/python.learning/pyt/scripts/python.exe
sc SparkContext(conf conf)# 准备RDD1
rdd1 sc.parallelize([1,2,3,4,5])# 准备RDD2
rdd2 sc.parallelize([(Hello, 3),(Spark, 5),(Hi, 7)])# 准备RDD3
rdd3 sc.parallelize([[1, 3, 5],[6, 7, 9],[11, 13, 11]])# 输出到文件中
rdd1.saveAsTextFile(E:\python.learning\hadoop分布式相关\data\output1/rdd1)
rdd2.saveAsTextFile(E:\python.learning\hadoop分布式相关\data\output2/rdd2)
rdd3.saveAsTextFile(E:\python.learning\hadoop分布式相关\data\output3/rdd3)注如果输出路径的文件存在代码将会报错 4.运行结果 创建几个文件取决于Hadoop上的分区数量
解决方式修改rdd的分区 5.修改rdd分区为1个
方式1
Sparkconf对象设置属性全局并行度为1
from pyspark import SparkConf, SparkContext
import os
os.environ[PYSPARK_PYTHON] E:/python.learning/pyt/scripts/python.exe
os.environ[HADOOP_HOME] E:\python.learning\hadoop分布式相关\hadoop-3.0.0
conf SparkConf().setMaster(local).setAppName(test_spark)
conf.set(spark.default.parallelize, 1)
sc SparkContext(conf conf)# 准备RDD1
rdd1 sc.parallelize([1,2,3,4,5])# 准备RDD2
rdd2 sc.parallelize([(Hello, 3),(Spark, 5),(Hi, 7)])# 准备RDD3
rdd3 sc.parallelize([[1, 3, 5],[6, 7, 9],[11, 13, 11]])# 输出到文件中
rdd1.saveAsTextFile(E:\python.learning\hadoop分布式相关\data\output1/rdd1)
rdd2.saveAsTextFile(E:\python.learning\hadoop分布式相关\data\output2/rdd2)
rdd3.saveAsTextFile(E:\python.learning\hadoop分布式相关\data\output3/rdd3)方式2
创建RDD的时候设置 parallelize方法传入numSlices参数为1
rdd1 sc.parallelize([1,2,3,4,5],1)