当前位置: 首页 > news >正文

成都建站培训外贸网站如何做的好处

成都建站培训,外贸网站如何做的好处,网站制作公司 深圳,网站建设网络推广一、UDF函数定义 #xff08;1#xff09;函数定义 #xff08;2#xff09;Spark支持定义函数 #xff08;3#xff09;定义UDF函数 #xff08;4#xff09;定义返回Array类型的UDF #xff08;5#xff09;定义返回字典类型的UDF 二、窗口函数 #xff08;11函数定义 2Spark支持定义函数 3定义UDF函数 4定义返回Array类型的UDF 5定义返回字典类型的UDF 二、窗口函数 1开窗函数简述 2窗口函数的语法 一、UDF函数定义 1函数定义 无论Hive还是SparkSQL分析处理数据时往往需要使用函数SparkSQL模块本身自带很多实现公共功能的函数在pyspark.sql.functions中。SparkSQL与Hive一样支持定义函数:UDF和UDAF尤其是UDF函数在实际项目中使用最为广泛。         Hive中自定义函数有三种类型 第一种UDFUser-Defined_-function函数 · 一对一的关系输入一个值经过函数以后输出一个值 · 在Hive中继承UDF类方法名称为evaluate返回值不能为void其实就是实现一个方法 第二种UDAFUser-Defined Aggregation Function聚合函数 · 多对一的关系输入多个值输出一个值通常于groupBy联合使用 第三种UDTFUser-Defined Table-Generating Functions函数 · 一对多的关系输入一个值输出多个值一行变多为行 · 用户自定义生成函数有点像flatMap 2Spark支持定义函数 目前来说Spark框架各个版本及各种语言对自定义函数的支持在SparkSQL中目前仅仅支持UDF函数和UDAF函数目前Python仅支持UDF。 Spark版本及支持函数定义 Apache Spark VersionSpark SQL UDFPythonJavaScalaSpark SQL UDAFJavaScalaSpark SQL UDFRHive UDFUDAFUDTF1.1-1.4√√1.5√experimental√1.6√√√2.0√√√√ 3定义UDF函数 ①sparksession.udf.register() 注册的UDF可以用于DSL和SQL返回值用于DSL风格传参内给的名字用于SQL风格。 ②pyspark.sql.functions.udf 仅能用于DSL风格。 其中F是from pyspark.sql import functions as F。其中被注册为UDF的方法名是指具体的计算方法如def add(x, y): x y  。 add就是将要被注册成UDF的方法名 # cording:utf8 from pyspark.sql import SparkSession import pyspark.sql.functions as F from pyspark.sql.types import IntegerType, StringType, StructType if __name__ __main__:spark SparkSession.builder.appName(udf_define).master(local[*]).getOrCreate()sc spark.sparkContext# 构建一个RDDrdd sc.parallelize([1, 2, 3, 4, 5, 6, 7]).map(lambda x:[x])df rdd.toDF([num])# TODO 1:方式1 sparksession.udf.register(),DSL和SQL风格均可使用# UDF的处理函数def num_ride_10(num):return num * 10# 参数1注册的UDF的名称这个UDF名称仅可以用于SQL风格# 参数2UDF的处理逻辑是一个单独定义的方法# 参数3声明UDF的返回值类型注意UDF注册时候必要声明返回值类型并且UDF的真实返回值一定要和声明的返回值一致# 当前这种方式定义的UDF可以通过参数1的名称用于SQL风格通过返回值对象用户的DSL风格udf2 spark.udf.register(udf1, num_ride_10, IntegerType())# SQL风格中使用# selectExpr 以SELECT的表达式执行表达式SQL风格的表达式字符串# select方法接受普通的字符串字段名或者返回值时Column对象的计算df.selectExpr(udf1(num)).show()# DSL 风格使用# 返回值UDF对象如果作为方法使用传入的参数一定是Column对象df.select(udf2(df[num])).show()# TODO 2:方式2注册仅能用于DSL风格udf3 F.udf(num_ride_10, IntegerType())df.select(udf3(df[num])).show() 方式1结果 方式2结果 4定义返回Array类型的UDF 注意数组或者list类型可以使用spark的ArrayType来描述即可。 注意声明ArrayType要类似这样:ArrayType(StringType())在ArrayType中传入数组内的数据类型。 # cording:utf8 from pyspark.sql import SparkSession import pyspark.sql.functions as F from pyspark.sql.types import IntegerType, StringType, StructType, ArrayType if __name__ __main__:spark SparkSession.builder.appName(udf_define).master(local[*]).getOrCreate()sc spark.sparkContext# 构建一个RDDrdd sc.parallelize([[hadoop spark flink], [hadoop flink java]])df rdd.toDF([line])# 注册UDFUDF的执行函数定义def split_line(data):return data.split( )# TODO 1:方式1 后见UDFudf2 spark.udf.register(udf1, split_line, ArrayType(StringType()))# DLS 风格df.select(udf2(df[line])).show()# SQL风格df.createTempView(lines)spark.sql(SELECT udf1(line) FROM lines).show(truncateFalse)# TODO 2:方式的形式构建UDFudf3 F.udf(split_line, ArrayType(StringType()))df.select(udf3(df[line])).show(truncateFalse) 5定义返回字典类型的UDF 注意字典类型返回值可以用StructType来进行描述StructType是—个普通的Spark支持的结构化类型.         只是可以用在                 · DF中用于描述Schema                 · UDF中用于描述返回值是字典的数据 # cording:utf8 import string from pyspark.sql import SparkSession import pyspark.sql.functions as F from pyspark.sql.types import IntegerType, StringType, StructType, ArrayType if __name__ __main__:spark SparkSession.builder.appName(udf_define).master(local[*]).getOrCreate()sc spark.sparkContext# 假设 有三个数字 1 2 3 在传入数字返回数字所在序号对应的 字母 然后和数字结合组成dict返回# 例传入1 返回{num:1, letters: a}rdd sc.parallelize([[1], [2], [3]])df rdd.toDF([num])# 注册UDFdef process(data):return {num: data, letters: string.ascii_letters[data]}UDF返回值是字典的话需要用StructType来接收udf1 spark.udf.register(udf1, process, StructType().add(num, IntegerType(), nullableTrue).\add(letters, StringType(), nullableTrue))# SQL风格df.selectExpr(udf1(num)).show(truncateFalse)# DSL风格df.select(udf1(df[num])).show(truncateFalse)6通过RDD构建UDAF函数 # cording:utf8 import string from pyspark.sql import SparkSession import pyspark.sql.functions as F from pyspark.sql.types import IntegerType, StringType, StructType, ArrayType if __name__ __main__:spark SparkSession.builder.appName(udf_define).master(local[*]).getOrCreate()sc spark.sparkContextrdd sc.parallelize([1, 2, 3, 4, 5], 3)df rdd.map(lambda x: [x]).toDF([num])# 方法使用RDD的mapPartitions 算子来完成聚合操作# 如果用mapPartitions API 完成UDAF聚合一定要单分区single_partition_rdd df.rdd.repartition(1)def process(iter):sum 0for row in iter:sum row[num]return [sum] # 一定要嵌套list因为mapPartitions方法要求返回值是list对象print(single_partition_rdd.mapPartitions(process).collect())二、窗口函数 1开窗函数简述 ●介绍 开窗函数的引入是为了既显示聚集前的数据又显示聚集后的数据。即在每一行的最后一列添加聚合函数的结果。 开窗用于为行定义一个窗口(这里的窗口是指运算将要操作的行的集合)它对一组值进行操作不需要使用GROUP BY子句对数据进行分组能够在同一行中同时返回基础行的列和聚合列。 ●聚合函数和开窗函数 聚合函数是将多行变成一行count,avg... 开窗函数是将一行变成多行; 聚合函数如果要显示其他的列必须将列加入到group by中开窗函数可以不使用group by直接将所有信息显示出来。 ●开窗函数分类 1.聚合开窗函数 聚合函数(列)OVER(选项)这里的选项可以是PARTITION BY子句但不可以是ORDER BY子句。 2.排序开窗函数 排序函数(列)OVER(选项)这里的选项可以是ORDER BY子句也可以是OVER(PARTITION BY子句ORDER BY子句)但不可以是PARTITION BY子句。 3.分区类型NTILE的窗口函数 2窗口函数的语法 窗口函数的语法 # cording:utf8 import string from pyspark.sql import SparkSession import pyspark.sql.functions as F from pyspark.sql.types import IntegerType, StringType, StructType, ArrayType if __name__ __main__:spark SparkSession.builder.appName(udf_define).master(local[*]).getOrCreate()sc spark.sparkContextrdd sc.parallelize([(张三, class_1, 99),(王五, class_2, 35),(王三, class_3, 57),(王久, class_4, 12),(王丽, class_5, 99),(王娟, class_1, 90),(王军, class_2, 91),(王俊, class_3, 33),(王君, class_4, 55),(王珺, class_5, 66),(郑颖, class_1, 11),(郑辉, class_2, 33),(张丽, class_3, 36),(张张, class_4, 79),(黄凯, class_5, 90),(黄开, class_1, 90),(黄恺, class_2, 90),(王凯, class_3, 11),(王凯杰, class_1, 11),(王开杰, class_2, 3),(王景亮, class_3, 99)])schema StructType().add(name, StringType()).\add(class, StringType()).\add(score, IntegerType())df rdd.toDF(schema)# 创建表df.createTempView(stu)# TODO 1:聚合窗口函数的演示spark.sql(SELECT *, AVG(score) over() AS avg_socre FROM stu).show()# TODO 2 排序相关的窗口函数计算# RANK over, DENSE_RANK over, ROW_NUMBER overspark.sql(SELECT *, ROW_NUMBER() OVER(ORDER BY score DESC) AS row_number_rank,DENSE_RANK() OVER(PARTITION BY class ORDER BY score DESC) AS dense_rank,RANK() OVER(ORDER BY score) AS RANKFROM stu).show()# TODO NTILEspark.sql(SELECT *, NTILE(6) OVER(ORDER BY score DESC) FROM stu).show()TODO1结果 TODO2结果展示 TODO3结果展示
http://www.hkea.cn/news/14351950/

相关文章:

  • 深圳网站优化公司哪家好南京网站seo服务
  • 怎样让网站被百度收录网站建设实训总结报告
  • 做视频网站服务器配置网站中的动态统计图如何做
  • 网站开发运行环境论文企业怎么注册163邮箱
  • 汉中微信网站建设推广wordpress搭建镜像
  • 支付宝手机网站中国工程机械网官网
  • 长春网站建设网站网站建好后维护麻烦吗
  • 做淘宝客的的网站有什么要求吗网站代码字体变大
  • 给自己做网站seo优化教程自学
  • 大连金普新区规划建设局网站有教人做衣服的网站
  • 南昌做网站电话如何制作flash网站
  • 自己电脑做网站教程深圳机场最新消息今天
  • 石油网页设计与网站建设网站推广优化外包公司哪家好
  • 义乌网站设计广州市公共资源交易中心
  • 有什么网站是可以做动态图的人人站cms
  • 网站建设招标范文郑州网站开发便宜
  • 常州网站建设公司报价免费ppypp网站
  • 重庆注册公司核名在哪个网站做电影网站要买什么
  • 自助网站建设价格平面设计和网页设计哪个工资高
  • 外国教程网站有哪些做网站滨州市
  • 网站建设和网站设计公司在哪里最新seo视频教程
  • 有哪些做企业网站的新增备案网站
  • 天一建设网站网站建设大作业电子版
  • 网站建设书籍资料免费云电脑
  • 网站咨询聊天怎么做建筑设计图设计说明
  • 金华公司建站模板服装网站建设的目的
  • 代做毕设的网站石家庄小程序开发多少钱
  • 写作网站平台大连市建设工程老网站
  • 深圳品牌网站设计泉州做网站优化哪家好
  • 做网站虚拟主机哪里有做外贸自己公司的网站一定要吗