广州网站建设优化,新渝网门户网,哪家培训机构好,如何创建微信小程序下单文章目录准备工作删除缺失值 3 的数据删除星级、评论数、评分中任意字段为空的数据删除非法数据hotel_data.csv通过编写Spark程序清洗酒店数据里的缺失数据、非法数据、重复数据准备工作
搭建 hadoop 伪分布或 hadoop 完全分布上传 hotal_data.csv 文件到 hadoopidea 配置…
文章目录准备工作删除缺失值 3 的数据删除星级、评论数、评分中任意字段为空的数据删除非法数据hotel_data.csv通过编写Spark程序清洗酒店数据里的缺失数据、非法数据、重复数据准备工作
搭建 hadoop 伪分布或 hadoop 完全分布上传 hotal_data.csv 文件到 hadoopidea 配置好 scala 环境
删除缺失值 3 的数据
读取 /hotel_data.csv删除缺失值 3 的数据 打印剔除的数量将清洗后的数据保存为/hotelsparktask1
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object Demo01 {def main(args: Array[String]): Unit {// System.setProperty(HADOOP_USER_NAME, root)//解决保存文件权限不够的问题val config: SparkConf new SparkConf().setMaster(local[1]).setAppName(1)val sc new SparkContext(config)val hdfsUrl hdfs://192.168.226.129:9000val filePath: String hdfsUrl/file3_1/hotel_data.csvval data: RDD[Array[String]] sc.textFile(filePath).map(_.split(,)).cache()val total: Long data.count()val dataDrop: RDD[Array[String]] data.filter(_.count(_.equals(NULL)) 3)println(删除的数据条目有: (total - dataDrop.count()))dataDrop.map(_.mkString(,)).saveAsTextFile(hdfsUrl /hotelsparktask1)sc.stop()}
}删除星级、评论数、评分中任意字段为空的数据
读取 /hotel_data.csv将字段{星级、评论数、评分}中任意字段为空的数据删除, 打印剔除的数量保存 /hotelsparktask2
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object Demo02 {def main(args: Array[String]): Unit {System.setProperty(HADOOP_USER_NAME, root)val config: SparkConf new SparkConf().setMaster(local[1]).setAppName(2)val sc new SparkContext(config)val hdfsUrl hdfs://192.168.226.129:9000val filePath: String hdfsUrl/file3_1/hotel_data.csvval data: RDD[Array[String]] sc.textFile(filePath).map(_.split(,)).cache()val total: Long data.count()val dataDrop: RDD[Array[String]] data.filter {arr: Array[String] !(arr(6).equals(NULL) || arr(10).equals(NULL) || arr(11).equals(NULL))}println(删除的数据条目有: (total - dataDrop.count()))dataDrop.map(_.mkString(,)).saveAsTextFile(hdfsUrl /hotelsparktask2)sc.stop()}
}删除非法数据
读取第一题的 /hotelsparktask1剔除数据集中评分和星级字段的非法数据合法数据是评分[05]的实数星级是指星级字段内容中包含 NULL、二星、三星、四星、五星的数据剔除数据集中的重复数据分别打印 删除含有非法评分、星级以及重复的数据条目数保存 /hotelsparktask3
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object Demo03 {def main(args: Array[String]): Unit {System.setProperty(HADOOP_USER_NAME, root)//解决权限问题val config: SparkConf new SparkConf().setMaster( local[1]).setAppName(3)val sc new SparkContext(config)val hdfsUrl hdfs://192.168.226.129:9000val filePath: String hdfsUrl/hotelsparktask1val lines: RDD[String] sc.textFile(filePath).cache()val data: RDD[Array[String]] lines.map(_.split(,))val total: Long data.count()val dataDrop: RDD[Array[String]] data.filter {arr: Array[String] try {(arr(10).toDouble 0) (arr(10).toDouble 5)} catch {case _: Exception false}}val lab Array(NULL, 一星, 二星, 三星, 四星, 五星)val dataDrop1: RDD[Array[String]] data.filter { arr: Array[String] var flag falsefor (elem - lab) {if (arr(6).contains(elem)) {flag true}}flag}val dataDrop2: RDD[String] lines.distinctprintln(删除的非法评分数据条目有: (total - dataDrop.count()))println(删除的非法星级数据条目有: (total - dataDrop1.count()))println(删除重复数据条目有: (total - dataDrop2.count()))val wordsRdd: RDD[Array[String]] lines.distinct.map(_.split(,)).filter {arr: Array[String] try {(arr(10).toDouble 0) (arr(10).toDouble 5)} catch {case _: Exception false}}.filter { arr: Array[String] var flag falsefor (elem - lab) {if (arr(6).contains(elem)) {flag true}}flag}wordsRdd.map(_.mkString(,)).saveAsTextFile(hdfsUrl /hotelsparktask3)sc.stop()}
}hotel_data.csv
下载数据https://download.csdn.net/download/weixin_44018458/87437211