可以做微网站的第三方平台,无锡做企业网站,郑州电力高等专科学校招生网,北京网站优化企业SparkSQL与Hive整合
SparkSQL和Hive的整合#xff0c;是一种比较常见的关联处理方式#xff0c;SparkSQL加载Hive中的数据进行业务处理#xff0c;同时将计算结果落地回Hive中。
整合需要注意的地方
1)需要引入hive的hive-site.xml#xff0c;添加classpath目录下面即可…SparkSQL与Hive整合
SparkSQL和Hive的整合是一种比较常见的关联处理方式SparkSQL加载Hive中的数据进行业务处理同时将计算结果落地回Hive中。
整合需要注意的地方
1)需要引入hive的hive-site.xml添加classpath目录下面即可或者放到$SPARK_HOME/conf。
2)为了能够正常解析hive-site.xml中hdfs路径需要将hdfs-site.xml和core-site.xml到classpath下面。整合编码如下:
object Hive_Support {def main(args: Array[String]): Unit {//创建sparkSql程序入口val spark: SparkSession SparkSession.builder().appName(demo).master(local[*]).enableHiveSupport().getOrCreate()//调用sparkContextval sc: SparkContext spark.sparkContext//设置日志级别sc.setLogLevel(WARN)//导包import spark.implicits._//查询hive当中的表spark.sql(show tables).show()//创建表spark.sql(CREATE TABLE person (id int, name string, age int) row format delimited fields terminated by )//导入数据spark.sql(load data local inpath./person.txt into table person)//查询表当中数据spark.sql(select * from person).show()}
}
SparkSQL函数操作
函数的定义
SQL中函数其实说白了就是各大编程语言中的函数或者方法就是对某一特定功能的封装通过它可以完成较为复杂的统计。这里的函数的学习就基于Hive中的函数来学习。
函数的分类
函数的分类方式非常多主要从功能和实现方式上进行区分。
实现方式上分类
1)UDF(User Defined function)用户自定义函数一路输入一路输出比如yeardate_add, instr。
2)UDAF(User Defined aggregation function)用户自定义聚合函数多路输入一路输出常见的聚合函数count、sum、collect_list。
3)UDTF(User Defined table function)用户自定义表函数一路输入多路输出explode。
4)开窗函数row_number()sum/max/min over。
用户自定义函数
当系统提供的这些函数满足不了我们的需要的话就只能进行自定义相关的函数一般自定义的函数两种UDF和UDAF。
1)UDF一路输入一路输出完成就是基于scala函数。
通过模拟获取字符串长度的udf来学习自定义udf操作。
object UDF_Demo {def main(args: Array[String]): Unit {//创建sparkSql程序入口val spark: SparkSession SparkSession.builder().appName(demo).master(local[*]).getOrCreate()//调用sparkContextval sc: SparkContext spark.sparkContext//设置日志级别sc.setLogLevel(WARN)//导包import spark.implicits._//加载文件val personDF: DataFrame spark.read.json(E:\\data\\people.json)//展示数据//personDF.show()//注册成为一张表personDF.createOrReplaceTempView(t_person)//赋予什么功能val fun (x:String){Name:x}//没有addName这个函数就注册它spark.udf.register(addName,fun)//查询spark.sql(select name,addName(name) from t_person).show()//释放资源spark.stop()}}
2)开窗函数over()开窗函数是按照某个字段分组然后查询出另一字段的前几个的值相当于分组取topN。
row_number() over (partitin by XXX order by XXX)
rank() 跳跃排序有两个第二名是后边跟着的是第四名
dense_rank() 连续排序有两个第二名是后边跟着的是第三名
row_number() 连续排序两个值相同排序也是不同
在使用聚合函数后会将多行变成一行而over()开窗函数其实就是给每个分组的数据按照其排序的顺序打上一个分组内的行号直接将所有列信息显示出来。在使用聚合函数后如果要显示其它的列必须将列加入到group by中而使用开窗函数后可以不使用group by。
代码如下
case class StudentScore(name:String,clazz:Int,score:Int)
object SparkSqlOverDemo {def main(args: Array[String]): Unit {val conf new SparkConf().setMaster(local[*]).setAppName(sparksqlover)val sc new SparkContext(conf)val spark SparkSession.builder().config(conf).getOrCreate()val arr01 Array((a,1,88),(b,1,78),(c,1,95),(d,2,74),(e,2,92),(f,3,99),(g,3,99),(h,3,45),(i,3,53),(j,3,78))import spark.implicits._val scoreRDD sc.makeRDD(arr01).map(xStudentScore(x._1,x._2,x._3)).toDSscoreRDD.createOrReplaceTempView(t_score)//查询t_score表数据spark.sql(select * from t_score).show()//使用开窗函数查找topN,rank() 跳跃排序有两个第二名是后边跟着的是第四名spark.sql(select name,clazz,score, rank() over( partition by clazz order by score desc ) rownum from t_score ).show()//讲使用开窗函数后的查询结果作为一张临时表这个临时表有每个班的成绩排名再取前三名spark.sql(select * from (select name,clazz,score, rank() over( partition by clazz order by score desc ) rownum from t_score) t1 where rownum 3 ).show()}
}