网站建设资金请示,珠海市工程造价信息网,太原关键词网站排名,网站开发中的服务抽离文章目录 Pyspark sql DataFrame相关文章RDDrepartition 重新分区replace 替换sameSemantics dataframe是否相等sample 采样sampleBy 分层采样schema 显示dataframe结构select 查询selectExpr 查询semanticHash 获取哈希值show 展示dataframesort 排序sortWithinPartitions 分区… 文章目录 Pyspark sql DataFrame相关文章RDDrepartition 重新分区replace 替换sameSemantics dataframe是否相等sample 采样sampleBy 分层采样schema 显示dataframe结构select 查询selectExpr 查询semanticHash 获取哈希值show 展示dataframesort 排序sortWithinPartitions 分区按照指定列排序stat 返回统计函数类型storageLevel 获取存储级别subtract 获取差集summary 总览tail 从结尾获取数据take 返回记录to 配合schema返回新结构的dataframe Pyspark sql DataFrame
相关文章
Pyspark下操作dataframe方法(1) Pyspark下操作dataframe方法(2) Pyspark下操作dataframe方法(3) Pyspark下操作dataframe方法(4) Pyspark下操作dataframe方法(5)
RDD
返回包含ROW对象的rdd
data.show()
-----------------
| name|age| id|gender|
-----------------
| ldsx| 12| 1| 男|
|test1| 20| 1| 女|
|test2| 26| 1| 男|
|test3| 19| 1| 女|
|test4| 51| 1| 女|
|test5| 13| 1| 男|
-----------------
data.rdd
MapPartitionsRDD[12] at javaToPython at NativeMethodAccessorImpl.java:0data.rdd.foreach(lambda x : print(type(x),x))
class pyspark.sql.types.Row Row(nametest3, age19, id1, gender女)
class pyspark.sql.types.Row Row(nametest4, age51, id1, gender女)
class pyspark.sql.types.Row Row(nametest5, age13, id1, gender男)
class pyspark.sql.types.Row Row(nameldsx, age12, id1, gender男)
class pyspark.sql.types.Row Row(nametest1, age20, id1, gender女)
class pyspark.sql.types.Row Row(nametest2, age26, id1, gender男)repartition 重新分区
每一个 RDD 包含的数据被存储在系统的不同节点上。逻辑上我们可以将 RDD 理解成一个大的数组数组中的每个元素就代表一个分区 (Partition) 。
在物理存储中每个分区指向一个存储在内存或者硬盘中的数据块 (Block) 其实这个数据块就是每个 task 计算出的数据块它们可以分布在不同的节点上。
RDD 只是抽象意义的数据集合分区内部并不会存储具体的数据只会存储它在该 RDD 中的 index通过该 RDD 的 ID 和分区的 index 可以唯一确定对应数据块的编号然后通过底层存储层的接口提取到数据进行处理。
data.show()
-----------------
| name|age| id|gender|
-----------------
| ldsx| 12| 1| 男|
|test1| 20| 1| 女|
|test2| 26| 1| 男|
|test3| 19| 1| 女|
|test4| 51| 1| 女|
|test5| 13| 1| 男|
-----------------
# 选择以某列进行分区
data.repartition(name).rdd.getNumPartitions()
1
# 指定分区数量进行分区可以指定多列
data.repartition(7,name,age).rdd.getNumPartitions()
7data data.repartition(5,gender)
data.rdd.glom().collect()[[], [Row(nameldsx, age12, id1, gender男), Row(nametest1, age20, id1, gender女),Row(nametest2, age26, id1, gender男), Row(nametest3, age19, id1, gender女),Row(nametest4, age51, id1, gender女), Row(nametest5, age13, id1, gender男)], [], [], []]# 直接操作rdd只能按数据分区不能按照列分区
data.rdd.repartition(1).glom().collect()
[[Row(nameldsx, age12, id1, gender男), Row(nametest2, age26, id1, gender男), Row(nametest5, age13, id1, gender男), Row(nametest1, age20, id1, gender女), Row(nametest3, age19, id1, gender女), Row(nametest4, age51, id1, gender女)]]data.repartition(2,id).rdd.glom().collect()
[[Row(nameldsx, age12, id1, gender男), Row(nametest1, age20, id1, gender女), Row(nametest2, age26, id1, gender男), Row(nametest3, age19, id1, gender女), Row(nametest4, age51, id1, gender女), Row(nametest5, age13, id1, gender男)], []]data.repartition(2).rdd.glom().collect()
[[Row(nametest2, age26, id1, gender男), Row(nametest3, age19, id1, gender女)], [Row(nametest1, age20, id1, gender女), Row(nameldsx, age12, id1, gender男), Row(nametest4, age51, id1, gender女), Row(nametest5, age13, id1, gender男)]]
replace 替换
当替换的值与原本列的数据类型不相同时会报错
df.show()
---------------
| age|height| name|
---------------
| 10| 80|Alice|
| 5| null| Bob|
|null| 10| Tom|
|null| null| null|
---------------
df.fillna({age:1,height:2,name:sr}).show()
--------------
|age|height| name|
--------------
| 10| 80|Alice|
| 5| 2| Bob|
| 1| 10| Tom|
| 1| 2| sr|
--------------df.na.replace([Alice, Bob], [A, B], name).show()
--------------
| age|height|name|
--------------
| 10| 80| A|
| 5| null| B|
|null| 10| Tom|
|null| null|null|
--------------df.show()
---------------
| age|height| name|
---------------
| 10| 80|Alice|
| 5| null| Bob|
|null| 10| Tom|
|null| null| null|
---------------df.na.replace(10,12).show()
---------------
| age|height| name|
---------------
| 12| 80|Alice|
| 5| null| Bob|
|null| 12| Tom|
|null| null| null|
---------------
sameSemantics dataframe是否相等
当两个 dataframe中的逻辑查询计划相等并因此返回相同的结果时返回 True。
data.show()
-----------------------
| name|age| id|gender|new_id|
-----------------------
| ldsx| 12| 1| 男| 1|
|test1| 20| 1| 女| 1|
|test2| 26| 1| 男| 1|
|test3| 19| 1| 女| 1|
|test4| 51| 1| 女| 1|
|test5| 13| 1| 男| 1|
-----------------------
data2.show()
-----------------------
| name|age| id|gender|new_id|
-----------------------
| ldsx| 12| 1| 男| 2.0|
|test1| 20| 1| 女| 2.0|
|test2| 26| 1| 男| 2.0|
|test3| 19| 1| 女| 2.0|
|test4| 51| 1| 女| 2.0|
|test5| 13| 1| 男| 2.0|
-----------------------data.sameSemantics(data2)
False
data.sameSemantics(data)
True
sample 采样
withReplacement:是否进行有放回采样默认为False表示进行无放回采样设置为True时表示进行有放回采样 fraction: 采样比例 float seed: 随机种子值值固定后采样获取固定默认为空
# 取样不固定
df.sample(0.1).show()
---
| id|
---
---
df.sample(0.1).show()
---
| id|
---
| 9|
---
df.sample(0.1).show()
---
| id|
---
| 1|
| 5|
---# 随机种子固定取样固定
df.sample(0.1,1).show()
---
| id|
---
| 3|
---
df.sample(0.1,1).show()
---
| id|
---
| 3|
---
sampleBy 分层采样
col列名
fractions: 采样字典
seed: 随机种子值值固定后采样获取固定默认为空
ataset spark.range(0, 100).select((col(id) % 3).alias(key))
dataset.show()---
|key|
---
| 0|
| 1|
| 2|
| 0|
| 1|
| 2|
...
...
| 0|
| 1|
| 2|
| 0|
| 1|
---# 列为key中值为0取样10%值为1取样10%值为2取样10%
dataset.sampleBy(key, fractions{0: 0.1, 1: 0.1,2:0.1}, seed0).show()
---
|key|
---
| 2|
| 0|
| 1|
| 2|
| 1|
| 2|
| 2|
| 1|
| 2|
---
# 列为key中值为0取样10%值为2取样10%
dataset.sampleBy(key, fractions{0: 0.1,2:0.1}, seed0).show()
---
|key|
---
| 2|
| 0|
| 2|
| 2|
| 2|
| 2|
---
schema 显示dataframe结构
将此DataFrame的架构作为pyspark.sql.types返回
df.schema
StructType([StructField(id, LongType(), False)])df.printSchema()
root|-- id: long (nullable false)select 查询
查询并返回新dataframe可结合多方法使用是。
df spark.createDataFrame([(2, Alice), (5, Bob)], schema[age, name])df.select(*).show()
--------
|age| name|
--------
| 2|Alice|
| 5| Bob|
--------df.select(df.name, (df.age 10).alias(age)).show()
--------
| name|age|
--------
|Alice| 12|
| Bob| 15|
--------selectExpr 查询
接受sql表达式并执行
df spark.createDataFrame([(2, Alice), (5, Bob)], schema[age, name])
df.show()
--------
|age| name|
--------
| 2|Alice|
| 5| Bob|
--------
df.selectExpr(age * 2,age2).show()
------------------
|(age * 2)|(age 2)|
------------------
| 4| 4|
| 10| 7|
------------------df.selectExpr(age * 2 as ldsx,age2).show()
-------------
|ldsx|(age 2)|
-------------
| 4| 4|
| 10| 7|
-------------semanticHash 获取哈希值
df.selectExpr(age * 2 as ldsx,age2).semanticHash()
-2082183221
df.semanticHash()
1019336781show 展示dataframe
展示前n行数据到控制台默认展示20行
df.show(1)
--------
|age| name|
--------
| 2|Alice|
--------
only showing top 1 rowsort 排序
按照指定列排序
from pyspark.sql.functions import desc, asc
# 下面方式效果一致
df.sort(desc(age)).show()
df.sort(age, ascendingFalse).show()
df.orderBy(df.age.desc()).show()
--------
|age| name|
--------
| 5| Bob|
| 2|Alice|
| 2| Bob|
--------# 使用两列排序一列降序一列默认升序
df.orderBy(desc(age), name).show()
--------
|age| name|
--------
| 5| Bob|
| 2|Alice|
| 2| Bob|
--------
# 使用两列排序都为降序
df.orderBy(desc(age), desc(name)).show()
--------
|age| name|
--------
| 5| Bob|
| 2| Bob|
| 2|Alice|
--------# 两列都为降序
df.orderBy([age, name], ascending[False, False]).show()
--------
|age| name|
--------
| 5| Bob|
| 2| Bob|
| 2|Alice|
--------sortWithinPartitions 分区按照指定列排序
df.sortWithinPartitions(age).show()
--------
|age| name|
--------
| 2|Alice|
| 2| Bob|
| 5| Bob|
--------stat 返回统计函数类型
df.stat
pyspark.sql.dataframe.DataFrameStatFunctions object at 0x7f55c87669e8storageLevel 获取存储级别
df.storageLevel
StorageLevel(False, False, False, False, 1)
df.cache().storageLevel
StorageLevel(True, True, False, True, 1)subtract 获取差集
返回一个新的DataFrame其中包含此DataFrame中的行但不包含另一个DataFrame中。d1.subtarct(d2),获取d1的差集。
df1 spark.createDataFrame([(a, 1), (a, 1), (b, 3), (c, 4)], [C1, C2])
df2 spark.createDataFrame([(a, 1), (a, 1), (b, 3)], [C1, C2])
df1.show()
------
| C1| C2|
------
| a| 1|
| a| 1|
| b| 3|
| c| 4|
------
df2.show()
------
| C1| C2|
------
| a| 1|
| a| 1|
| b| 3|
------
df1.subtract(df2).show()
------
| C1| C2|
------
| c| 4|
------
summary 总览
计算数值列和字符串列的指定统计信息。可用的统计数据有-count-mean-stddev-min-max-指定为百分比的任意近似百分位数 如果没有给出统计数据此函数将计算计数、平均值、标准偏差、最小值、近似四分位数百分位数分别为25%、50%和75%和最大值。
df.show()
--------------------
| name|age|weight|height|
--------------------
| Bob| 13| 40.3| 150.5|
|Alice| 12| 37.8| 142.3|
| Tom| 11| 44.1| 142.2|
--------------------df.summary().show()
24/09/19 11:24:13 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting spark.sql.debug.maxToStringFields.---------------------------------------------------
|summary| name| age| weight| height|
---------------------------------------------------
| count| 3| 3| 3| 3|
| mean| null|12.0|40.733333333333334| 145.0|
| stddev| null| 1.0| 3.172275734127371|4.763402145525822|
| min|Alice| 11| 37.8| 142.2|
| 25%| null| 11| 37.8| 142.2|
| 50%| null| 12| 40.3| 142.3|
| 75%| null| 13| 44.1| 150.5|
| max| Tom| 13| 44.1| 150.5|
---------------------------------------------------tail 从结尾获取数据
运行尾部需要将数据移动到应用程序的驱动程序进程中如果使用非常大的num可能会导致驱动程序进程因OutOfMemoryError而崩溃。
df.show()
--------------------
| name|age|weight|height|
--------------------
| Bob| 13| 40.3| 150.5|
|Alice| 12| 37.8| 142.3|
| Tom| 11| 44.1| 142.2|
--------------------
df.tail(2)
[Row(nameAlice, age12, weight37.8, height142.3), Row(nameTom, age11, weight44.1, height142.2)]take 返回记录
head 调用的就是taketake调用的limit
# 源码def take(self, num: int) - List[Row]:Returns the first num rows as a :class:list of :class:Row... versionadded:: 1.3.0.. versionchanged:: 3.4.0Supports Spark Connect.Parameters----------num : intNumber of records to return. Will return this number of recordsor all records if the DataFrame contains less than this number of records..Returns-------listList of rowsExamples-------- df spark.createDataFrame(... [(14, Tom), (23, Alice), (16, Bob)], [age, name])Return the first 2 rows of the :class:DataFrame. df.take(2)[Row(age14, nameTom), Row(age23, nameAlice)]return self.limit(num).collect()to 配合schema返回新结构的dataframe
from pyspark.sql.types import StructField, StringType
df spark.createDataFrame([(a, 1)], [i, j])
df.show()
------
| i| j|
------
| a| 1|
------
df.schema
StructType([StructField(i, StringType(), True), StructField(j, LongType(), True)])# 设置新的scheam
schema StructType([StructField(j, StringType()), StructField(i, StringType())])
df.schema
StructType([StructField(i, StringType(), True), StructField(j, LongType(), True)])# df使用新的scheam进行转换查看scheam
df.to(schema).schema
# 顺序改变字段类型改变
StructType([StructField(j, StringType(), True), StructField(i, StringType(), True)])
df.to(schema).show()
------
| j| i|
------
| 1| a|
------# 当schema设置原df不存在的列则会默认补充null
schema StructType([StructField(q, StringType()), StructField(w, StringType()),StructField(i, StringType())])
df.to(schema).show()
-----------
| q| w| i|
-----------
|null|null| a|
-----------