铁总建设函网站,百度ip地址,西安网站建设有限公司,网页链接制作生成文章目录 Pyspark catalog用法catalog 介绍cache 缓存表uncache 清除缓存表cleanCache 清理所有缓存表createExternalTable 创建外部表currentDatabase 返回当前默认库tableExists 检查数据表是否存在#xff0c;包含临时视图databaseExists 检查数据库是否存在dropGlobalTemp… 文章目录 Pyspark catalog用法catalog 介绍cache 缓存表uncache 清除缓存表cleanCache 清理所有缓存表createExternalTable 创建外部表currentDatabase 返回当前默认库tableExists 检查数据表是否存在包含临时视图databaseExists 检查数据库是否存在dropGlobalTempView 删除全局临时视图dropTempView 删除临时视图functionExists 检查函数是否存在getDatabase 获取具有指定名称的数据库getFunction 获取方法getTable 获取数据表isCached 检查是否缓存成功listCatalogs 列出可用的catalogslistColumns 返回数据表的列信息listDatabases 获取数据库列表listTables 获取数据表包含临时视图setCurrentDatabase 设置当前数据库refreshTable 刷新缓存refreshByPath 刷新路径recoverPartitions 恢复分区 Pyspark catalog用法
catalog 介绍
Catalog是Spark中用于管理元数据信息的接口这些元数据可能包括库、内部或外部表、函数、表列及临时视图等。
总的来说PySpark Catalogs是PySpark框架中用于管理和查询元数据的重要组件它使得Python用户能够更有效地利用PySpark进行大数据处理和分析。
spark SparkSession.builder.appName(LDSX_TEST) \.config(hive.metastore.uris, thrift://hadoop01:9083) \.config(spark.master,local[2] ) \.enableHiveSupport().getOrCreate()cache 缓存表
可以设置缓存等级默认缓存等级为MEMORY_AND_DISK,是数据表级别的缓存跟缓存dataframe存在区别,
设置不存在的表报错
# 缓存数据表
spark.catalog.cacheTable(ldsx_test.ldsx_table_one)
#检查是否缓存成功
ldsx spark.catalog.isCached(ldsx_test.ldsx_table_one)
Trueuncache 清除缓存表
当表不存在数据库会报错
spark.catalog.uncacheTable(ldsx_test.ldsx_table_one)cleanCache 清理所有缓存表
spark.catalog.clearCache()createExternalTable 创建外部表
# spark.catalog.createExternalTable(# tableNameldsx_test_table,# path ./ldsx_one.csv,# databaseldsx_test,## )currentDatabase 返回当前默认库
返回当前默认所在数据库spark.catalog.setCurrentDatabase 设置所在数据库
data spark.catalog.currentDatabase()tableExists 检查数据表是否存在包含临时视图
data spark.catalog.tableExists(ldsx_test.ldsx_table_one)
TruedatabaseExists 检查数据库是否存在
data spark.catalog.databaseExists(ldsx_test)dropGlobalTempView 删除全局临时视图
全局临时表查找时候需要指向global_temp
要删除的表不存在报错
#创建全局临时表
spark.createDataFrame([(1, 1)]).createGlobalTempView(my_table)
#注意查询时候需要指向 global_temp
spark.sql(select * from global_temp.my_table).show()
#删除全局临时
ldsx spark.catalog.dropGlobalTempView(my_table)dropTempView 删除临时视图
要删除的表不存在报错
#创建临时表
spark.createDataFrame([(1, 1)]).createTempView(my_table)
spark.sql(select * from my_table).show()
#删除临时表
ldsx spark.catalog.dropTempView(my_table)functionExists 检查函数是否存在
spark.catalog.functionExists(count)
TruegetDatabase 获取具有指定名称的数据库
data spark.catalog.getDatabase(ldsx_test)
print(data)
Database(nameldsx_test, catalogspark_catalog, description, locationUrihdfs://master:7171/home/ldsx/opt/hadoopData/hive_data/ldsx_test.db)getFunction 获取方法
获取不到方法报错
spark.sql(CREATE FUNCTION my_func1 AS test.org.apache.spark.sql.MyDoubleAvg)
data spark.catalog.getFunction(my_func1)
print(data)
Function(namemy_func1, catalogspark_catalog, namespace[default], descriptionN/A., classNametest.org.apache.spark.sql.MyDoubleAvg, isTemporaryFalse)getTable 获取数据表
获取不到表报错
data spark.catalog.getTable(ldsx_table_one)
print(data)
Table(nameldsx_table_one, catalogspark_catalog, namespace[ldsx_test], descriptionNone, tableTypeMANAGED, isTemporaryFalse)isCached 检查是否缓存成功
# 缓存数据表
spark.catalog.cacheTable(ldsx_test.ldsx_table_one)
data spark.catalog.isCached(ldsx_test.ldsx_table_one)
TruelistCatalogs 列出可用的catalogs
catalogs spark.catalog.listCatalogs()
print(catalogs)listColumns 返回数据表的列信息
# 参数数据表数据库
catalogs spark.catalog.listColumns(ldsx_table_one,ldsx_test)
print(catalogs)[Column(nameage, description??, dataTypestring, nullableTrue, isPartitionFalse, isBucketFalse),Column(namename, description??, dataTypestring, nullableTrue, isPartitionFalse, isBucketFalse),Column(namefraction, description??, dataTypestring, nullableTrue, isPartitionFalse, isBucketFalse),Column(nameclass, description??, dataTypestring, nullableTrue, isPartitionFalse, isBucketFalse),Column(namegender, description??, dataTypestring, nullableTrue, isPartitionFalse, isBucketFalse)]listDatabases 获取数据库列表
data1 spark.catalog.listDatabases()
print(data1)
[Database(namedefault, catalogspark_catalog, descriptionDefault Hive database,locationUrihdfs://master:7171/home/ldsx/opt/hadoopData/hive_data),
Database(nameldsx_test, catalogspark_catalog, description,locationUrihdfs://master:7171/home/ldsx/opt/hadoopData/hive_data/ldsx_test.db)]listTables 获取数据表包含临时视图
# 展示数据库中数据表以及临时视图
spark.catalog.setCurrentDatabase(ldsx_test)
spark.createDataFrame([(1,1)]).createTempView(TEST)
data spark.catalog.listTables()
print(data)
[Table(nameldsx_table_one, catalogspark_catalog, namespace[ldsx_test], descriptionNone,tableTypeMANAGED, isTemporaryFalse),Table(nameTEST, catalogNone, namespace[], descriptionNone, tableTypeTEMPORARY, isTemporaryTrue)]setCurrentDatabase 设置当前数据库
spark.catalog.setCurrentDatabase(ldsx_test)
data spark.catalog.currentDatabase()
print(data)ldsx_testrefreshTable 刷新缓存
看官网案例是刷新已经缓存的表
当一个表执行了cacheTable后元数据有变动使用refreshTable进行元数据刷新refreshByPath 刷新路径
# 假设有一个 Hive 表其数据存储在 HDFS 上的某个路径
path /user/hive/warehouse/mydb.db/mytable
# 刷新该路径下的表或分区信息
spark.catalog.refreshByPath(path)
df spark.sql(SELECT * FROM mydb.mytable)
df.show()recoverPartitions 恢复分区
recoverPartitions尝试恢复 Hive 表中丢失的分区信息实际使用后更新