wordpress 全站通知,泉州网站建站推广,更改网站名字,传奇怎么做充值网站下面介绍如何使用pyspark处理计算超大数据的统计指标#xff0c;主要为#xff1a;最大值、最小值、均值、方差、标准差、中位数、众数、非重复值等。
# 加载稽核数据
rd_sql fselect * from database.table
spark_data spark.sql(rd_sql)# 计算众数 由于spar…下面介绍如何使用pyspark处理计算超大数据的统计指标主要为最大值、最小值、均值、方差、标准差、中位数、众数、非重复值等。
# 加载稽核数据
rd_sql fselect * from database.table
spark_data spark.sql(rd_sql)# 计算众数 由于spark 2.4版本未内置相关函数 需要自定义
import pyspark.sql.functions as F
# 自定义mode的计算
def sparkdf_mode(df, cols):# 构建一个空数据框mode_df pd.DataFrame()# 循环每一列for col in cols:# 先过滤空值filtered_df df.filter(F.col(col).isNotNull())# 加个判断 防止数据全空置时报错if filtered_df.count()0:# 统计出现次数 排序grouped_counts filtered_df.groupBy(col).count().orderBy(F.col(count).desc())# 获取计数值最大的第一行first_row grouped_counts.first()# 转sparkdfpdf spark.createDataFrame([first_row], grouped_counts.columns).toPandas()[col]else:# 数据全空置 赋值Nonepdf pd.DataFrame({col: [None]}) # 拼接mode_df pd.concat([mode_df, pdf], axis1)return mode_dffrom pyspark.sql.functions import col, count, when, approx_count_distinct
# 分开统计 先统计字符类型
# 统计指标
string_stats spark_data.select(string_colsdate_cols).summary(max,min).toPandas()
# 非空值数量
string_nonull spark_data.select([count(when(col(c).isNotNull(), c)).alias(c) for c in (string_colsdate_cols)]).toPandas()
# 非重复值
string_unique spark_data.agg(*[approx_count_distinct(col(c)).alias(c) for c in (string_colsdate_cols)]).toPandas()
# 众数
string_mode sparkdf_mode(spark_data, (string_colsdate_cols))
# 添加空值占位
null_rows pd.DataFrame(None, indexnp.arange(len(string_stats), len(string_stats) 3), columnsstring_stats.columns)
string_stats string_stats.append(null_rows)
# 上下拼接
string_data pd.concat([string_stats.iloc[:, 1:], string_nonull, string_unique, string_mode])
print(fstring_data稽核完成)# 统计数值类型
# 统计指标
float_stats spark_data.select(float_cols).summary(max,min,mean,50%,stddev).toPandas()
print(ffloat_stats稽核完成)
# 非空值
float_nonull spark_data.select([count(when(col(c).isNotNull(), c)).alias(c) for c in float_cols]).toPandas()
# 非重复值
float_unique spark_data.agg(*[approx_count_distinct(col(c)).alias(c) for c in float_cols]).toPandas()
# 众数
float_mode sparkdf_mode(spark_data, float_cols)
# 上下拼接
float_data pd.concat([float_stats.iloc[:, 1:], float_nonull, float_unique, float_mode])
print(ffloat_data稽核完成)# 合并转置
pdf pd.concat([string_data, float_data], axis1).T
# 重命名
pdf.columns [max, min, mean, median, std, nonull_cnt, unique_cnt, mode]
# pdf转为sdf
sdf spark.createDataFrame(pdf)
# 创建临时视图 用于sqlAPI操作
sdf.createOrReplaceTempView(temp_view)
# 插入库表
spark.sql(finsert overwrite table database.table select * from temp_view)
# 用完删除临时视图
spark.catalog.dropTempView(temp_view)
# 关闭spark
spark.stop()