当前位置: 首页 > news >正文

建设共享经济网站的可行性网站后台不显示文章内容

建设共享经济网站的可行性,网站后台不显示文章内容,百度竞价培训班,网站地图深度做多少合适背景 Spark 3.5.0 目前 Spark中的实现中#xff0c;对于多分区的写入默认会先排序#xff0c;这是没必要的。可以设置spark.sql.maxConcurrentOutputFileWriters 为大于0来避免排序。 分析 这部分主要分为三个部分: 一个是V1Writes规则的重改; 另一个是FileFormatWriter中…背景 Spark 3.5.0 目前 Spark中的实现中对于多分区的写入默认会先排序这是没必要的。可以设置spark.sql.maxConcurrentOutputFileWriters 为大于0来避免排序。 分析 这部分主要分为三个部分: 一个是V1Writes规则的重改; 另一个是FileFormatWriter中的dataWriter的选择; 还有一个是Spark中为什么会加上Sort 这三部分是需要结合在一起分析讨论的 V1Writes规则的重改 直接转到代码部分: object V1Writes extends Rule[LogicalPlan] with SQLConfHelper {import V1WritesUtils._override def apply(plan: LogicalPlan): LogicalPlan {if (conf.plannedWriteEnabled) {plan.transformUp {case write: V1WriteCommand if !write.child.isInstanceOf[WriteFiles] val newQuery prepareQuery(write, write.query)val attrMap AttributeMap(write.query.output.zip(newQuery.output))val writeFiles WriteFiles(newQuery, write.fileFormat, write.partitionColumns,write.bucketSpec, write.options, write.staticPartitions)val newChild writeFiles.transformExpressions {case a: Attribute if attrMap.contains(a) a.withExprId(attrMap(a).exprId)}val newWrite write.withNewChildren(newChild :: Nil).transformExpressions {case a: Attribute if attrMap.contains(a) a.withExprId(attrMap(a).exprId)}newWrite}} else {plan}}其中 prepareQuery是对满足条件的计划前加上Sort逻辑排序其中prepareQuery关键的代码如下: val requiredOrdering write.requiredOrdering.map(_.transform {case a: Attribute attrMap.getOrElse(a, a)}.asInstanceOf[SortOrder])val outputOrdering empty2NullPlan.outputOrderingval orderingMatched isOrderingMatched(requiredOrdering.map(_.child), outputOrdering)if (orderingMatched) {empty2NullPlan} else {Sort(requiredOrdering, global false, empty2NullPlan)}write.requiredOrdering中涉及到的类为InsertIntoHadoopFsRelationCommand和InsertIntoHiveTable,且这两个物理计划中的requiredOrdering实现都是 V1WritesUtils.getSortOrder(outputColumns, partitionColumns, bucketSpec, options)getSortOrder方法关键代码如下 val sortColumns V1WritesUtils.getBucketSortColumns(bucketSpec, dataColumns)if (SQLConf.get.maxConcurrentOutputFileWriters 0 sortColumns.isEmpty) {// Do not insert logical sort when concurrent output writers are enabled.Seq.empty} else {// We should first sort by dynamic partition columns, then bucket id, and finally sorting// columns.(dynamicPartitionColumns writerBucketSpec.map(_.bucketIdExpression) sortColumns).map(SortOrder(_, Ascending))}所以说 如果 spark.sql.maxConcurrentOutputFileWriters为0默认值为0则会加上Sort逻辑计划具体的实现可以参考SPARK-37287 如果spark.sql.maxConcurrentOutputFileWriters为0默认值为0且 sortColumns为空(大部分情况下为空除非建表是partition加上bucket),则不会加上Sort逻辑计划 FileFormatWriter 中的dataWriter的选择 在 InsertIntoHadoopFsRelationCommand和InsertIntoHiveTable 这两个物理计划中最终写入文件/数据的时候会调用到FileFormatWriter.write方法这里有个concurrentOutputWriterSpecFunc函数变量的设置 val concurrentOutputWriterSpecFunc (plan: SparkPlan) {val sortPlan createSortPlan(plan, requiredOrdering, outputSpec)createConcurrentOutputWriterSpec(sparkSession, sortPlan, sortColumns)}val writeSpec WriteFilesSpec(description description,committer committer,concurrentOutputWriterSpecFunc concurrentOutputWriterSpecFunc)executeWrite(sparkSession, plan, writeSpec, job)设置concurrentOutputWriterSpecFunc的代码如下 private def createConcurrentOutputWriterSpec(sparkSession: SparkSession,sortPlan: SortExec,sortColumns: Seq[Attribute]): Option[ConcurrentOutputWriterSpec] {val maxWriters sparkSession.sessionState.conf.maxConcurrentOutputFileWritersval concurrentWritersEnabled maxWriters 0 sortColumns.isEmptyif (concurrentWritersEnabled) {Some(ConcurrentOutputWriterSpec(maxWriters, () sortPlan.createSorter()))} else {None}}如果 spark.sql.maxConcurrentOutputFileWriters为0默认值为0,则ConcurrentOutputWriterSpec为None 如果 spark.sql.maxConcurrentOutputFileWriters大于0且 sortColumns为空(大部分情况下为空除非建表是partition加上bucket)则为Some(ConcurrentOutputWriterSpec(maxWriters, () sortPlan.createSorter()) 其中executeWrite会调用WriteFilesExec.doExecuteWrite方法,从而调用FileFormatWriter.executeTask,这里就涉及到dataWriter选择 val dataWriter if (sparkPartitionId ! 0 !iterator.hasNext) {// In case of empty job, leave first partition to save meta for file format like parquet.new EmptyDirectoryDataWriter(description, taskAttemptContext, committer)} else if (description.partitionColumns.isEmpty description.bucketSpec.isEmpty) {new SingleDirectoryDataWriter(description, taskAttemptContext, committer)} else {concurrentOutputWriterSpec match {case Some(spec) new DynamicPartitionDataConcurrentWriter(description, taskAttemptContext, committer, spec)case _ new DynamicPartitionDataSingleWriter(description, taskAttemptContext, committer)}}这里其实会根据 concurrentOutputWriterSpec来选择不同的dataWriter,默认情况下为DynamicPartitionDataSingleWriter 否则就会为DynamicPartitionDataConcurrentWriter 这两者的区别见下文 Spark中为什么会加上Sort 至于Spark在写入文件的时候会加上Sort这个是跟写入的实现有关的也就是DynamicPartitionDataSingleWriter和DynamicPartitionDataConcurrentWriter的区别: DynamicPartitionDataSingleWriter 在任何时刻只有一个writer在写文件这能保证写入的稳定性不会在写入文件的时候消耗大量的内存但是速度会慢DynamicPartitionDataConcurrentWriter 会有多个 writer 同时写文件能加快写入文件的速度但是因为多个文件的同时写入可能会导致OOM 对于DynamicPartitionDataSingleWriter 会根据partition或者bucket作为最细粒度来作为writer的标准如果相邻的两条记录所属不同的partition或者bucket则会切换writer所以说如果不根据partition或者bucket排序的话会导致writer频繁的切换这会大大降低文件的写入速度。所以说需要根据partition或者bucket进行排序。 参考 [SPARK-37287][SQL] Pull out dynamic partition and bucket sort from FileFormatWriter[SQL] Allow FileFormatWriter to write multiple partitions/buckets without sort
http://www.hkea.cn/news/14454012/

相关文章:

  • 怎么用mvc架构做网站免费无线
  • 简单的房源展示网站开发html购物网站
  • 使用cdn做网站内容加速python 网站开发流程图
  • 安全的赣州网站建设做网站公司 深圳信科
  • 网站如何加后台临沂市建设工程多图联审系统 网站
  • 培训人员网站建设网站如何做排名
  • 代理网站在线怀化市住房和城乡建设局网站
  • 宝安公司网站建设比较好的商业策划书
  • 宁波网站建设 华企立方凡科 建设淘宝客网站
  • dz网站建设教程上海做网站哪家好
  • 校园门户网站系统建设网站平均停留时间
  • it类网站河北省最大的网页设计公司
  • 网站宣传页面做家教网站要多少钱
  • 邢台网站设计哪家专业曲靖网站微信建设
  • 郑志平爱站网创始人诸城网站建设葛小燕
  • 苏州网站建设凡科wordpress添加文章
  • 英文都不懂 学网站建设维护难吗一凡招聘 建筑人才网
  • 段友做的看电影网站合肥专业网站设计公司价格
  • 品牌网站建设堅持大蝌蚪开发一个网站要多久
  • 厦门市建设局网站咨询电话门户网站和微网站的区别
  • 软件开发培训去哪报名做移动网站优化首
  • 建设一个连接的网站wordpress销售主题
  • 服装网站html模板招聘网站上还要另外做简历吗
  • 微网站app制作实木餐桌椅网站建设公司
  • 沈阳网站建设那家好云南省城乡住房建设厅网站
  • 青岛物流公司网站建设连云港优化推广
  • 做seo时网站更新的目的专业做公墓 陵园的网站
  • 抚顺网站网站建设浦江县建设局网站
  • 佛山网页网站制作高柏企业管理咨询有限公司
  • 抖音平台建站工具北京网站定制建设