一个空间放多个网站,网站目录层级建设,展示型装饰网站模板,网站是怎么优化的因为RDD在spark中是分布式存储
1、python中定义的变量仅仅在driver中运行#xff0c;在excutor中是获取不到值的——广播变量
2、若定义了一个变量进行累加#xff0c;先分别在driver和excutor中进行累加#xff0c;但是结果是不会主动返回给driver的——累加器
Broadcas…因为RDD在spark中是分布式存储
1、python中定义的变量仅仅在driver中运行在excutor中是获取不到值的——广播变量
2、若定义了一个变量进行累加先分别在driver和excutor中进行累加但是结果是不会主动返回给driver的——累加器
Broadcast Variables广播变量 driver中存放python变量广播到别的excutor中 若不使用就会每个task存放一个 不能修改只能读 通过value使用该变量
if __name__ __main__:# 配置环境os.environ[JAVA_HOME] D:/Program Files/Java/jdk1.8.0_271# 配置Hadoop的路径就是前面解压的那个路径os.environ[HADOOP_HOME] D:/hadoop-3.3.1/hadoop-3.3.1# 配置base环境Python解析器的路径os.environ[PYSPARK_PYTHON] C:/ProgramData/Miniconda3/python.exe # 配置base环境Python解析器的路径os.environ[PYSPARK_DRIVER_PYTHON] C:/ProgramData/Miniconda3/python.exe# 获取 conf 对象# setMaster 按照什么模式运行local bigdata01:7077 yarn# local[2] 使用2核CPU * 你本地资源有多少核就用多少核# appName 任务的名字conf SparkConf().setMaster(local[*]).setAppName(第一个Spark程序)# 假如我想设置压缩# conf.set(spark.eventLog.compression.codec,snappy)# 根据配置文件得到一个SC对象第一个conf 是 形参的名字第二个conf 是实参的名字sc SparkContext(confconf)fileRdd sc.textFile(../datas/user.tsv,2)city_dict {1: 北京,2: 上海,3: 广州,4: 深圳,5: 苏州,6: 无锡,7: 重庆,8: 厦门,9: 大理,10: 成都}# 将一个变量广播出去广播到executor中不是task中city_dict_broad sc.broadcast(city_dict)# 广播变量# class pyspark.broadcast.Broadcastprint(type(city_dict_broad ))# class dictprint(type(city_dict_broad.value))def getLine(line):list01 line.split( )#cityName city_dict.get(int(list01[3]))# 使用广播变量的变量获取数据cityName city_dict_broad.value.get(int(list01[3]))# print(cityName)return line cityNamemapRdd fileRdd.map(getLine)mapRdd.foreach(print)# 释放广播变量city_dict_broad.unpersist()# 使用完后记得关闭sc.stop() 累加器
将所有的excutor中的变量返回到driver中进行汇总。
否则变量是放在excutor中的而打印的是driver中变量值不会改变。
用于修改——汇总
import os
import reimport jieba
# 导入pyspark模块
from pyspark import SparkContext, SparkConf
from pyspark.storagelevel import StorageLevel
------------------------------------------Description : TODOSourceFile : _06SouGou案例Author : yangeDate : 2024/10/31 星期四
-------------------------------------------if __name__ __main__:# 配置环境os.environ[JAVA_HOME] C:/Program Files/Java/jdk1.8.0_241# 配置Hadoop的路径就是前面解压的那个路径os.environ[HADOOP_HOME] D:/hadoop-3.3.1# 配置base环境Python解析器的路径os.environ[PYSPARK_PYTHON] C:/ProgramData/Miniconda3/python.exe # 配置base环境Python解析器的路径os.environ[PYSPARK_DRIVER_PYTHON] C:/ProgramData/Miniconda3/python.exe# 获取 conf 对象# setMaster 按照什么模式运行local bigdata01:7077 yarn# local[2] 使用2核CPU * 你本地资源有多少核就用多少核# appName 任务的名字conf SparkConf().setMaster(local[*]).setAppName(搜索热词案例)# 假如我想设置压缩# conf.set(spark.eventLog.compression.codec,snappy)# 根据配置文件得到一个SC对象第一个conf 是 形参的名字第二个conf 是实参的名字sc SparkContext(confconf)mapRdd sc.textFile(../../datas/zuoye/sogou.tsv,minPartitions8) \.filter(lambda line:len(re.split(\s,line)) 6) \.map(lambda line:(re.split(\s,line)[0],re.split(\s,line)[1],re.split(\s,line)[2][1:-1])).persist(StorageLevel.MEMORY_AND_DISK_2)# 统计一天每小时点击量并按照点击量降序排序_sum 0def sumTotalLine(tuple1):global _sum # 把_sum 设置为全局变量timeStr tuple1[0] # 10:19:18if timeStr[0:2] 10:_sum 1mapRdd.foreach(lambda tuple1:sumTotalLine(tuple1))print(_sum) # 结果是0# 使用完后记得关闭sc.stop()
上面程序最终结果是0因为 sum0 是在 Driver 端的内存中的executor 中程序再累加也是无法改变 Driver 端的结果的。下面的则为正确的
import os
import reimport jieba
# 导入pyspark模块
from pyspark import SparkContext, SparkConf
from pyspark.storagelevel import StorageLevel
------------------------------------------Description : TODOSourceFile : _06SouGou案例Author : yangeDate : 2024/10/31 星期四
-------------------------------------------if __name__ __main__:# 配置环境os.environ[JAVA_HOME] C:/Program Files/Java/jdk1.8.0_241# 配置Hadoop的路径就是前面解压的那个路径os.environ[HADOOP_HOME] D:/hadoop-3.3.1# 配置base环境Python解析器的路径os.environ[PYSPARK_PYTHON] C:/ProgramData/Miniconda3/python.exe # 配置base环境Python解析器的路径os.environ[PYSPARK_DRIVER_PYTHON] C:/ProgramData/Miniconda3/python.exe# 获取 conf 对象# setMaster 按照什么模式运行local bigdata01:7077 yarn# local[2] 使用2核CPU * 你本地资源有多少核就用多少核# appName 任务的名字conf SparkConf().setMaster(local[*]).setAppName(搜索热词案例)# 假如我想设置压缩# conf.set(spark.eventLog.compression.codec,snappy)# 根据配置文件得到一个SC对象第一个conf 是 形参的名字第二个conf 是实参的名字sc SparkContext(confconf)accCounter sc.accumulator(0)mapRdd sc.textFile(../../datas/zuoye/sogou.tsv,minPartitions8) \.filter(lambda line:len(re.split(\s,line)) 6) \.map(lambda line:(re.split(\s,line)[0],re.split(\s,line)[1],re.split(\s,line)[2][1:-1])).persist(StorageLevel.MEMORY_AND_DISK_2)# 统计一天每小时点击量并按照点击量降序排序#_sum 0def sumTotalLine(tuple1):#global _sum # 把_sum 设置为全局变量timeStr tuple1[0] # 10:19:18if timeStr[0:2] 10:accCounter.add(1)mapRdd.foreach(lambda tuple1:sumTotalLine(tuple1))print(accCounter.value) # 104694# 假如我不知道累加器这个操作这个题目怎么做print(mapRdd.filter(lambda tuple1: tuple1[0][0:2] 10).count())# 使用完后记得关闭sc.stop()