采集站seo赚钱辅导班,网站怎么做地区屏蔽js,王烨岚,内蒙古网站制作公司1、将emp.csv、dept.csv文件上传到分布式环境#xff0c;再用
hdfs dfs -put dept.csv /input/
hdfs dfs -put emp.csv /input/
将本地文件put到hdfs文件系统的input目录下 2、或者调用本地文件也可以。区别#xff1a;sc.textFile(file:///D:\\temp\\emp.csv再用
hdfs dfs -put dept.csv /input/
hdfs dfs -put emp.csv /input/
将本地文件put到hdfs文件系统的input目录下 2、或者调用本地文件也可以。区别sc.textFile(file:///D:\\temp\\emp.csv) import org.apache.spark.sql.SparkSessionimport org.apache.spark.sql.types._import spark.implicits._case classEmp(empno:Int,ename:String,job:String,mgr:String,hiredate:String,sal:Int,comm:String,deptno:Int)val lines sc.textFile(hdfs://Master:9000/input/emp.csv).map(_.split(,))val allEmp lines.map(xEmp(x(0).toInt,x(1),x(2),x(3),x(4),x(5).toInt,x(6),x(7).toInt))val allEmpDF allEmp.toDFallEmpDF.show StructType 是个case class,一般用于构建schema. 因为是case class,所以使用的时候可以不用new关键字
构造函数 可以传入Seq,List,Array,都是可以的~ 还可以用无参的构造器,因为它有一个无参的构造器.
例子
private val schema: StructType StructType(List(StructField(name, DataTypes.StringType),StructField(age, DataTypes.IntegerType))) 也可以是
private val schema: StructType StructType(Array(StructField(name, DataTypes.StringType),StructField(age, DataTypes.IntegerType))) 还可以调用无参构造器,这么写
private val schema (new StructType).add(StructField(name, DataTypes.StringType)).add(StructField(age, DataTypes.IntegerType)) 这个无参的构造器,调用了一个有参构造器.this里面是个方法,这个方法的返回值是Array类型,实际上就是无参构造器调用了主构造器
def this() this(Array.empty[StructField])case class StructType(fields: Array[StructField]) extends DataType with Seq[StructField] {} import org.apache.spark.sql.types._val myschema StructType(List(StructField(empno,DataTypes.IntegerType),StructField(ename,DataTypes.StringType),StructField(job,DataTypes.StringType),StructField(mgr,DataTypes.StringType),StructField(hiredate,DataTypes.StringType),StructField(sal,DataTypes.IntegerType),StructField(comm,DataTypes.StringType),StructField(deptno,DataTypes.IntegerType)))val empcsvRDD sc.textFile(hdfs://Master:9000/input/emp.csv).map(_.split(,))import org.apache.spark.sql.Rowval rowRDDempcsvRDD.map(line Row (line(0).toInt,line(1),line(2),line(3),line(4),line(5).toInt,line(6),line(7).toInt))val df spark.createDataFrame(rowRDD,myschema) 将people.json文件上传到分布式环境
hdfs dfs -put people.json /inputhdfs dfs -put emp.json /input
//读json文件
val df spark.read.json(hdfs://Master:9000/input/emp.json)df.show df.select (ename).show df.select($ename).show df.select($ename,$sal,$sal100).show df.filter($sal2000).show df.groupBy($deptno).count.show df.createOrReplaceTempView(emp) spark.sql(select * from emp).show spark.sql(select * from emp where deptno10).show spark.sql(select deptno,sum(sal) from emp group by deptno).show //1 创建一个普通的 view 和一个全局的 viewdf.createOrReplaceTempView(emp1)df.createGlobalTempView(emp2)//2 在当前会话中执行查询均可查询出结果spark.sql(select * from emp1).showspark.sql(select * from global_temp.emp2).show//3 开启一个新的会话执行同样的查询spark.newSession.sql(select * from emp1).show //运行出错spark.newSession.sql(select * from global_temp.emp2).show //7、创建 Datasets//创建 DataSet方式一使用序列//1、定义 case classcase class MyData(a:Int,b:String)//2、生成序列并创建 DataSetval ds Seq(MyData(1,Tom),MyData(2,Mary)).toDS//3、查看结果ds.showds.collect //创建 DataSet方式二使用 JSON 数据//1、定义 case classcase class Person(name: String, gender: String)//2、通过 JSON 数据生成 DataFrameval df spark.read.json(sc.parallelize({gender: Male, name: Tom}:: Nil))//3、将 DataFrame 转成 DataSetdf.as[Person].showdf.as[Person].collect //创建 DataSet方式三使用 HDFS 数据val linesDS spark.read.text(hdfs://Master:9000/input/word.txt).as[String]val words linesDS.flatMap(_.split( )).filter(_.length 3)words.showwords.collect val result linesDS.flatMap(_.split( )).map((_,1)).groupByKey(x x._1).countresult.showresult.orderBy($value).show 1、将emp.json文件上传到分布式环境再用
hdfs dfs -put emp.json /input/
将本地文件put到hdfs文件系统的input目录下 //8、Datasets 的操作案例//1.使用 emp.json 生成 DataFrameval empDF spark.read.json(hdfs://Master:9000/input/emp.json)//查询工资大于 3000 的员工empDF.where($sal 3000).show//创建 case classcase classEmp(empno:Long,ename:String,job:String,hiredate:String,mgr:String,sal:Long,comm:String,deptno:Long)//生成 DataSets并查询数据val empDS empDF.as[Emp]//查询工资大于 3000 的员工empDS.filter(_.sal 3000).show//查看 10 号部门的员工empDS.filter(_.deptno 10).show//多表查询//1、创建部门表val deptRDDsc.textFile(hdfs://Master:9000/input/dept.csv).map(_.split(,))case class Dept(deptno:Int,dname:String,loc:String)val deptDS deptRDD.map(xDept(x(0).toInt,x(1),x(2))).toDS//2、创建员工表case classEmp(empno:Int,ename:String,job:String,mgr:String,hiredate:String,sal:Int,comm:String,deptno:Int)val empRDD sc.textFile(hdfs://Master:9000/input/emp.csv).map(_.split(,))val empDS empRDD.map(x Emp(x(0).toInt,x(1),x(2),x(3),x(4),x(5).toInt,x(6),x(7).toInt)).toDS//3、执行多表查询等值链接val result deptDS.join(empDS,deptno)//另一种写法注意有三个等号val result deptDS.joinWith(empDS,deptDS(deptno)empDS(deptno))//查看执行计划result.explain