常州建设局网站首页,建设营销型网站,电商网站设计公司,常用的关键词优化策略有哪些目录
一、添加依赖
二、配置log4j
三、spark提交jar包
四、读取文件
(一)加载数据
(二)保存数据
1.Parquet
2.json
3.CSV
4.MySql
5.hive on spark
6.IDEA的Spark中操作Hive 一、添加依赖
propertiesproject.build.sourceEncodingUTF-8/proje…目录
一、添加依赖
二、配置log4j
三、spark提交jar包
四、读取文件
(一)加载数据
(二)保存数据
1.Parquet
2.json
3.CSV
4.MySql
5.hive on spark
6.IDEA的Spark中操作Hive 一、添加依赖
propertiesproject.build.sourceEncodingUTF-8/project.build.sourceEncodingmaven.compiler.source1.8/maven.compiler.sourcemaven.compiler.target1.8/maven.compiler.targetspark.version3.1.2/spark.versionmysql.version8.0.29/mysql.version/properties
!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core --dependencygroupIdorg.apache.spark/groupIdartifactIdspark-core_2.12/artifactIdversion${spark.version}/version/dependency!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql --dependencygroupIdorg.apache.spark/groupIdartifactIdspark-sql_2.12/artifactIdversion${spark.version}/version/dependency!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive --dependencygroupIdorg.apache.spark/groupIdartifactIdspark-hive_2.12/artifactIdversion${spark.version}/version/dependency!-- https://mvnrepository.com/artifact/org.apache.spark/spark-graphx --dependencygroupIdorg.apache.spark/groupIdartifactIdspark-graphx_2.12/artifactIdversion${spark.version}/version/dependency!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java --dependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion${mysql.version}/version/dependency!-- https://mvnrepository.com/artifact/com.alibaba/fastjson --dependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion1.2.62/version/dependency
二、配置log4j
src/main/resources/log4j.properties
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the License); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an AS IS BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
## Set everything to be logged to the console
log4j.rootCategoryERROR, console
log4j.appender.consoleorg.apache.log4j.ConsoleAppender
log4j.appender.console.targetSystem.err
log4j.appender.console.layoutorg.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n# Set the default spark-shell log level to WARN. When running the spark-shell, the
# log level for this class is used to overwrite the root loggers log level, so that
# the user can have different defaults for the shell and regular Spark apps.
log4j.logger.org.apache.spark.repl.MainWARN# Settings to quiet third party logs that are too verbose
log4j.logger.org.spark_project.jettyWARN
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycleERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyperINFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreterINFO# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandlerFATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistryERROR将log4j-defaults.properties复制到resources下并重命名为log4j.properties第19行log4j.rootCategoryINFO, console 修改为log4j.rootCategoryERROR, console
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}object SparkDemo {def main(args: Array[String]): Unit {val conf: SparkConf new SparkConf().setMaster(local[*]).setAppName(sparkDemo)val sc :SparkContext SparkContext.getOrCreate(conf)
// println(sc) // org.apache.spark.SparkContext63998bf4val spark: SparkSession SparkSession.builder().master(local[*]).appName(sparkSessionDemo).getOrCreate()
// println(spark) // org.apache.spark.sql.SparkSession682c1e93val rdd: RDD[Int] sc.parallelize(1 to 10)rdd.collect().foreach(println)val pNum: Int rdd.getNumPartitionsprintln(分区数量,pNum)// (分区数量,20)}
}
三、spark提交jar包
[rootlxm147 opt]# spark-submit --class nj.zb.kb21.WordCount --master local[*] ./sparkstu-1.0-SNAPSHOT.jar (cjdison,1)
(spark,1)
(cdio,1)
(cjiodscn,1)
(hcuediun,1)
(hello,3)
(java,1)
(nodsn,1)
(jcido,1)
(jcndio,1)
(cjidsovn,1)
(world,1)四、读取文件
(一)加载数据 SparkSQL 默认读取和保存的文件格式为parquet scala spark.read.
csv format jdbc json load option
options orc parquet schema table text textFile如果读取不同格式的数据可以对不同的数据格式进行设定 scala spark.read.format(…)[.option(…)].load(…) ➢ format(…)指定加载的数据类型包括csv、jdbc、json、orc、parquet和 textFile。 ➢ load(…)在csv、jdbc、json、orc、parquet和textFile格式下需要传入加载 数据的路径。 ➢ option(…)在jdbc格式下需要传入 JDBC 相应参数url、user、password 和 dbtable 前面都是使用 read API 先把文件加载到 DataFrame 然后再查询也可以直接在文件上进行查询 文件格式.文件路径 # 读取本地文件
# 方式一
spark.sql(select * from json.file:///opt/soft/spark312/data/user.json).show# 方式二
scala val df spark.read.json(file:///opt/soft/spark312/data/user.json)
df: org.apache.spark.sql.DataFrame [age: bigint, username: string]scala df.show
-----------
|age|username|
-----------
| 30|zhangsan|
| 20| lisi|
| 40| wangwu|
-----------# 读取HDFS上的文件
# 方式一
scala spark.sql(select * from json.hdfs://lxm147:9000/data/user.json).show
2023-03-30 16:25:09,132 WARN metastore.ObjectStore: Failed to get database json, returning NoSuchObjectException
-----------
|age|username|
-----------
| 30|zhangsan|
| 20| lisi|
| 40| wangwu|
-----------# 方式二
la spark.read.json(hdfs://lxm147:9000/data/user.json).show
-----------
|age|username|
-----------
| 30|zhangsan|
| 20| lisi|
| 40| wangwu|
----------- (二)保存数据 df.write.save 是保存数据的通用方法 scala df.write.
bucketBy formart jdbc mode options parquet save
sortBy csv insertInto json option orc
partitionBy saveAsTable text 如果保存不同格式的数据可以对不同的数据格式进行设定 scaladf.write.format(…)[.option(…)].save(…) ➢ format(…)指定保存的数据类型包括csv、jdbc、json、orc、parquet和 textFile。 ➢ save (…)在csv、orc、parquet和textFile格式下需要传入保存数据的路径。 ➢ option(…)在jdbc格式下需要传入 JDBC 相应参数url、user、password 和 dbtable 保存操作可以使用 SaveMode, 用来指明如何处理数据使用 mode()方法来设置。 有一点很重要: 这些 SaveMode 都是没有加锁的, 也不是原子操作。 SaveMode 是一个枚举类其中的常量包括 示例 1.保存为指定格式文件 scala df.write.format(json).save(file:///opt/soft/spark312/data/output1)如果同一个文件存储两次会报错文件已存在 scala df.write.format(json).save(file:///opt/soft/spark312/data/output)scala df.write.format(json).save(file:///opt/soft/spark312/data/output)
org.apache.spark.sql.AnalysisException: path file:/opt/soft/spark312/data/output already exists.at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:122)2.追加存储文件append scala df.write.format(json).mode(append).save(file:///opt/soft/spark312/data/output) 3.覆盖存储文件overwrite scala df.write.format(json).mode(overwrite).save(file:///opt/soft/spark312/data/output)4.忽略原文件ignore scala df.write.format(json).mode(ignore).save(file:///opt/soft/spark312/data/output) 1.Parquet
# 读取json文件
scala spark.read.load(file:///opt/soft/spark312/examples/src/main/resources/users.parquet)
# 将json文件保存为parquet文件
scala df1.write.mode(append).save(file:///opt/soft/spark312/data/output1)
2.json
# 加载json文件
scala val df2 spark.read.json(file:///opt/soft/spark312/data/user.json)
df2: org.apache.spark.sql.DataFrame [age: bigint, username: string]# 创建临时表
scala df2.createOrReplaceTempView(user)# 数据查询
scala spark.sql(select * from user where age 30).show
-----------
|age|username|
-----------
| 40| wangwu|
-----------3.CSV
# 读取CSV文件
scala val df spark.read.format(csv).option(seq,;).option(inferSchema,true).option(header,true).load(file:///opt/soft/spark312/examples/src/main/resources/people.csv)
df: org.apache.spark.sql.DataFrame [name;age;job: string]scala df.show
------------------
| name;age;job|
------------------
|Jorge;30;Developer|
| Bob;32;Developer|
------------------4.MySql
添加依赖
dependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion8.0.29/version
/dependency
package com.atguigu.bigdata.spark.sqlimport org.apache.spark.SparkConf
import org.apache.spark.sql._import java.util.Propertiesobject Spark04_SparkSQL_JDBC {def main(args: Array[String]): Unit {// TODO 创建SparkSQL的运行环境val sparkConf: SparkConf new SparkConf().setMaster(local[*]).setAppName(sparkSQL)val spark: SparkSession SparkSession.builder().config(sparkConf).getOrCreate()import spark.implicits._// TODO 读取MySql数据//方式 1通用的 load 方法读取spark.read.format(jdbc).option(url, jdbc:mysql://192.168.180.141:3306/exam).option(driver, com.mysql.cj.jdbc.Driver).option(user, root).option(password, root).option(dbtable, student).load().show()//方式 2:通用的 load 方法读取 参数另一种形式spark.read.format(jdbc).options(Map(url - jdbc:mysql://192.168.180.141:3306/exam?userrootpasswordroot, dbtable - student, driver - com.mysql.cj.jdbc.Driver)).load().show()//方式 3:使用 jdbc 方法读取val props: Properties new Properties()props.setProperty(user, root)props.setProperty(password, root)val df: DataFrame spark.read.jdbc(jdbc:mysql://192.168.180.141:3306/exam, users, props)df.show()// TODO 保存MySql数据df.write.format(jdbc).option(url, jdbc:mysql://192.168.180.141:3306/exam).option(driver, com.mysql.cj.jdbc.Driver).option(user, root).option(password, root).option(dbtable, student1).mode(SaveMode.Append).save()// TODO 关闭环境spark.stop()}
}5.hive on spark 如果想连接外部已经部署好的 Hive需要通过以下几个步骤 ➢ Spark 要接管 Hive 需要把 hive-site.xml 拷贝到 conf/目录下 cp /opt/soft/hive312/conf/hive-site.xml /opt/soft/spark-local/conf ➢ 把 Mysql 的驱动 copy 到 jars/目录下 cp /opt/soft/hive312/lib/mysql-connector-java-8.0.29.jar /opt/soft/spark-local/jars ➢ 如果访问不到 hdfs则需要把 core-site.xml 和 hdfs-site.xml 拷贝到 conf/目录下 ➢ 重启 spark-shell scala spark.sql(show tables).show
returning NoSuchObjectException
----------------------------
|database|tableName|isTemporary|
----------------------------
----------------------------# 读取本地文件
scala val df spark.read.json(file:///opt/soft/spark-local/data/user.json)
df: org.apache.spark.sql.DataFrame [age: bigint, username: string] # 创建临时表
scala df.createOrReplaceTempView(user)# 读取临时表
scala spark.sql(select * from user).show
-----------
|age|username|
-----------
| 30|zhangsan|
| 20| lisi|
| 40| wangwu|
-----------# 创建表并设定字段
scala spark.sql(create table atguigu(id int))# 读取hdfs上的文件中的数据加载到
scala spark.sql(load data inpath /opt/soft/hive312/warehouse/atguigu/id.txt into table atguigu)
res5: org.apache.spark.sql.DataFrame []# 查询默认数据库
scala spark.sql(show tables).show
----------------------------
|database|tableName|isTemporary|
----------------------------
| default| atguigu| false|
| | user| true|
----------------------------# 查询表中的数据
scala spark.sql(select * from atguigu).show
---
| id|
---
| 1|
| 2|
| 3|
| 4|
---
6.IDEA的Spark中操作Hive
dependenciesdependencygroupIdorg.apache.spark/groupIdartifactIdspark-core_2.12/artifactIdversion3.1.2/version/dependencydependencygroupIdorg.apache.spark/groupIdartifactIdspark-sql_2.12/artifactIdversion3.1.2/version/dependencydependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion8.0.29/version/dependencydependencygroupIdorg.apache.spark/groupIdartifactIdspark-hive_2.12/artifactIdversion3.1.2/version/dependencydependencygroupIdorg.apache.hive/groupIdartifactIdhive-exec/artifactIdversion3.1.2/version/dependency/dependencies