网站后端做留言板功能,网站建设的图片叠加步骤过程,企业网站手机网站建设,足球推介网Spark输出文件的个数#xff0c;如何合并小文件?
Spark的driver是怎么驱动作业流程的?
Spark SQL的劣势?
介绍下Spark Streaming和Structed Streaming
Spark为什么比Hadoop速度快?
DAG划分Spark源码实现?
Spark Streaming的双流join的过程#xff0c;怎么做的?
… Spark输出文件的个数如何合并小文件?
Spark的driver是怎么驱动作业流程的?
Spark SQL的劣势?
介绍下Spark Streaming和Structed Streaming
Spark为什么比Hadoop速度快?
DAG划分Spark源码实现?
Spark Streaming的双流join的过程怎么做的?
Spark的Block管理
Spark怎么保证数据不丢失
Spark SQL如何使用UDF?
Spark温度二次排序 Spark输出文件的个数如何合并小文件?
在Apache Spark中输出文件的个数通常由RDD弹性分布式数据集或DataFrame/Dataset在进行保存操作如saveAsTextFile, saveAsParquet, write等时的分区数量决定。默认情况下每个分区会产生一个输出文件。因此减少输出文件数量通常涉及到减少RDD或DataFrame的分区数。
如何合并小文件1、使用coalesce()或repartition()方法: 在DataFrame或RDD上使用coalesce(numPartitions)或repartition(numPartitions)方法可以改变分区数进而影响输出文件的数量。repartition()会导致全量的Shuffle而coalesce()在减少分区时尝试避免全量Shuffle因此在需要减少文件数量但又不想增加过多计算开销时更为推荐。但需注意当从大量分区合并到极少数分区时coalesce()也可能导致数据倾斜。2、自适应分区Adaptive Partitioning: 在Spark SQL中可以通过设置spark.sql.adaptive.enabled true来启用自适应查询执行特性它可以在执行过程中动态地调整分区数有助于减少小文件的生成。3、控制文件格式的输出配置: 对于Parquet、ORC等支持块大小或行数配置的文件格式在保存DataFrame时可以通过设置相应的配置如parquet.block.size、orc.stripe.size来控制输出文件的大小间接影响文件数量。4、使用单个任务合并文件: 可以创建一个新的Spark作业其目的仅为读取先前产生的小文件并使用coalesce(1)或repartition(1)将所有数据合并到一个分区中然后输出为一个大文件。这种方法适用于处理已经生成的小文件情况。5、修改默认shuffle分区数: 如果问题是由于默认的shuffle分区数过多导致的可以调整spark.sql.shuffle.partitions配置来减少默认的分区数。6、使用输出文件提交机制: 考虑FileOutputCommitter的不同版本V1和V2虽然这不是直接合并文件的方法但理解这些机制有助于优化输出过程特别是在如何管理临时文件和最终文件方面。 通过上述方法可以有效地控制和减少Spark作业产生的输出文件数量从而优化存储空间使用和后续的数据处理效率。
Spark的driver是怎么驱动作业流程的?
Spark的Driver是Spark应用程序的核心协调组件负责管理并驱动整个作业的执行流程。以下是Driver驱动作业流程的主要步骤
1、初始化SparkContext: 当Spark应用启动时Driver首先会创建一个SparkContext对象常简称为sc。SparkContext是Spark与集群资源管理器如Standalone、YARN或Mesos交互的入口点负责初始化Spark应用并与之建立连接包括请求必要的资源。2、构建DAG有向无环图: Driver会接收用户程序中的RDD弹性分布式数据集操作如map、filter、reduceByKey等并将这些操作转换为一系列任务。这一系列操作会被组织成一个DAG这个DAG描述了数据处理的逻辑和依赖关系。3、划分Stage: Driver会进一步分析DAG基于shuffle操作将其划分为不同的Stage。宽依赖如reduceByKey会触发新的Stage的开始而窄依赖如map则可以在同一个Stage内连续执行。这样做的目的是为了优化执行计划减少数据的跨节点传输。4、任务调度: Driver根据划分好的Stage生成具体的任务Tasks并将这些任务分发到各个Executor上执行。Driver负责跟踪任务的状态包括任务的成功、失败或重试并且根据Executor的资源使用情况和任务执行进度动态地调整任务调度。5、资源管理: Driver与集群管理器Master协作根据应用的需求动态申请和释放Executor资源。Executor是实际执行计算任务的进程它们在Worker节点上运行并由Driver分配任务。6、监控和故障恢复: Driver还会监控整个作业的执行情况包括Executor的运行状态。如果Executor失败或任务执行出错Driver可以重新安排任务执行以确保作业的最终完成。7、结果收集与展示: 当所有任务执行完毕后Driver负责收集Executor返回的结果并可能进一步处理这些结果最终将应用的输出呈现给用户。对于交互式查询例如Spark SQLDriver还负责维护结果集并提供查询接口。 整个流程中Driver作为大脑不断地接收用户程序指令制定执行策略与集群资源管理系统交互调度任务并确保作业能够高效、正确地执行完毕。
Spark SQL的劣势?
Spark SQL作为Apache Spark中用于处理结构化数据的模块虽然在处理大规模数据、支持多种数据源、提供高性能的查询和分析能力等方面具有显著优势但仍然存在一些劣势具体可以归纳如下
1、稳定性问题 相对于Hive等传统的大数据查询工具Spark SQL在处理大规模任务时特别是在SQL复杂度较高或数据量巨大的情况下可能会出现稳定性问题如内存溢出OOM等问题。 在某些情况下如进行join或group by等操作时如果数据量过大Spark SQL可能会遇到性能瓶颈和稳定性挑战。2、资源利用率 在某些部署模式下如Spark SQL模式即每次提交SQL任务都会启动一个新的application可能会导致资源利用率较低因为每个任务都需要独立启动和占用资源。 相对于Hive等基于MapReduce的框架Spark SQL可能无法充分利用集群资源尤其是在处理某些特定类型的工作负载时。3、学习曲线 Spark SQL虽然提供了SQL查询接口和DataFrame API两种操作方式但对于不熟悉Spark生态系统的用户来说可能需要花费一定的时间来学习和掌握这些工具和技术。 DataFrame和DataSet等高级抽象概念的使用也需要一定的编程经验和技能。4、多租户和权限管理 在某些部署模式下如使用ThriftServer时由于资源没有隔离不同用户之间的SQL任务可能会相互影响导致稳定性问题。 Spark SQL本身可能不提供完善的多租户和权限管理功能需要依赖外部工具或框架来实现。5、与其他组件的集成 虽然Spark SQL作为Spark生态系统的一部分可以与其他Spark组件如Spark Streaming、MLlib等无缝集成但在与其他非Spark组件如Hadoop生态系统中的其他工具集成时可能会面临一些挑战和限制。6、错误检查和调试 虽然DataSet提供了编译时的类型检查功能但DataFrame并不具备这种能力这可能导致在运行时才发现类型错误等问题。 在处理复杂的数据转换和计算任务时可能需要花费一定的时间来调试和排查问题。7、部署和维护 根据不同的业务需求和集群环境选择合适的Spark SQL部署模式可能需要一定的技术决策和经验。 部署和维护一个高性能、稳定的Spark SQL集群也需要一定的技术能力和资源投入。 综上所述虽然Spark SQL在处理结构化数据方面具有显著优势但在稳定性、资源利用率、学习曲线、多租户和权限管理、与其他组件的集成、错误检查和调试以及部署和维护等方面仍然存在一些挑战和限制。在使用Spark SQL时需要根据具体的业务需求和场景来权衡这些因素并采取相应的策略和技术来优化性能和提高稳定性。
介绍下Spark Streaming和Structed Streaming
Spark Streaming和Structured Streaming都是Apache Spark生态系统中的组件用于处理实时数据流。以下是关于这两个组件的详细介绍
Spark Streaming 1. 概述
Spark Streaming是Apache Spark的一个核心扩展用于进行可扩展、高吞吐量和容错的实时数据流处理。它允许用户通过流式计算引擎处理实时数据流并以低延迟的方式对数据进行分析、处理和存储。
2. 主要特点
实时数据处理能够实时处理来自各种数据源如Kafka、Flume、Kinesis等的数据流。 低延迟相比传统的批处理系统Spark Streaming能够实现毫秒级的延迟。 容错性提供了高度的容错性能够在节点故障时自动恢复保证数据处理的可靠性和稳定性。 扩展性通过Spark的弹性分布式计算模型Spark Streaming能够轻松地扩展到数千台节点处理大规模的数据流。 3. 实现原理
Spark Streaming将实时数据流划分为一系列称为微批次micro-batches的小批量数据并在每个微批次内使用Spark引擎进行批处理计算。这种微批次的方式使得Spark Streaming具有与批处理系统相似的编程模型并且能够利用Spark引擎的优化和性能。
Structured Streaming 1. 概述
Structured Streaming是Spark SQL的一个扩展用于处理实时数据流。它是一个基于Spark SQL引擎构建的可伸缩的且具有容错性的实时流处理引擎主要处理无界数据。
2. 主要特点
实时计算框架作为一个实时计算框架Structured Streaming使用Dataset作为数据模型支持多种编程语言如Scala、Java、Python。 容错性通过checkpoint和WALWrite-Ahead Logging进行数据容错保证数据处理的可靠性。 Exactly-Once语义确保每条记录仅被处理一次即使在故障恢复后也是如此。 与Spark SQL的集成Structured Streaming与Spark SQL紧密集成使得流处理和批处理可以使用相同的代码和工具。 3. 编程模型
在Structured Streaming中流数据被视为一个无界的表其数据随着时间的推移而不断增长。开发人员可以使用与批处理相同的DataFrame和Dataset API来处理流数据。Structured Streaming还提供了丰富的转换操作如map、reduce、join等使用户可以方便地对数据流进行处理。
4. 与Spark Streaming的比较
Structured Streaming可以看作是Spark Streaming的进化版它提供了更高的抽象级别和更简洁的API使得实时数据处理变得更加容易和直观。同时Structured Streaming还利用了Spark SQL的优化器如Catalyst来优化执行计划从而提高了处理效率。
综上所述Spark Streaming和Structured Streaming都是用于实时数据流处理的强大工具但它们在实现原理、编程模型和特性上有所不同。根据具体的应用场景和需求可以选择适合的组件来处理实时数据流。
Spark为什么比Hadoop速度快?
Spark 相比于 Hadoop MapReduce 在速度上有显著提升主要原因包括以下几个方面
1、基于内存的计算Spark 设计之初就强调内存计算能力它能够将数据载入内存中进行处理极大地加速了数据处理速度。当数据在内存中时访问速度远超硬盘I/O尤其是在迭代计算和需要多次访问相同数据集的场景下如机器学习算法的训练过程。2、DAG执行模型Spark 引入了有向无环图DAG执行引擎这使得它能更灵活地优化作业执行计划。不同于Hadoop MapReduce的线性作业流程Spark可以构建复杂的多阶段计算流程并且优化执行顺序减少不必要的数据读写操作尤其是减少了shuffle过程中磁盘I/O的需求。3、减少磁盘I/O操作在Hadoop MapReduce中每次Map和Reduce操作之间的数据交换都需要写入磁盘然后读取。而Spark在很多情况下能避免这样的中间结果落地磁盘直接在内存中进行传递或者仅在必要时才写入磁盘显著减少了I/O开销。4、任务调度优化Spark的DAGScheduler能够更好地理解整个作业的依赖关系并据此做出更智能的调度决策。它能够合并任务减少Job的执行阶段进一步提高执行效率。5、JVM和线程优化Spark利用多线程模型在Executor进程中执行任务而不是为每个任务启动一个新的JVM实例这样减少了JVM启动的开销。同时Spark对JVM进行了优化比如使用了基于内存的序列化库Kryo以提高序列化和反序列化的效率。6、数据持久化与缓存Spark支持将经常访问的数据集持久化到内存中甚至可以将数据存储在磁盘上以应对内存不足的情况这使得重复访问的数据无需重新计算进一步加速了处理过程。7、统一的数据处理平台Spark提供了统一的API来处理批处理、实时流处理、交互式查询以及机器学习等多种工作负载这意味着数据处理流程可以在同一框架内无缝集成减少了数据在不同系统间移动的开销。 综上所述Spark通过内存计算、高效的执行模型、减少磁盘I/O、智能调度以及对JVM和数据处理流程的优化实现了比Hadoop MapReduce更快的数据处理速度。
DAG划分Spark源码实现?
在Apache Spark中DAGDirected Acyclic Graph有向无环图是一个核心概念用于表示一个Spark作业中各个任务之间的依赖关系。然而Spark源码中并没有直接名为“DAG划分”的特定部分因为DAG的构建和划分是分散在多个组件和阶段中的。不过我可以概述一下在Spark中如何构建和处理DAG的大致流程。
1、构建DAG 当用户提交一个Spark作业如一个Spark SQL查询或一个RDD转换链时Spark的Driver会解析这个作业并构建一个逻辑执行计划Logical Plan。 这个逻辑执行计划可以被视为一个DAG其中每个节点代表一个操作如投影、过滤、连接等而边则代表数据在这些操作之间的流动。 接着Spark的优化器Catalyst Optimizer会对逻辑执行计划进行优化可能会改变DAG的结构以提高执行效率。2、物理执行计划的生成 优化后的逻辑执行计划会被转换为物理执行计划Physical Plan这个物理执行计划更接近于实际的执行流程。 在物理执行计划的生成过程中Spark会考虑数据的分区、可用的资源以及其他物理因素以便为每个任务分配适当的资源。3、DAG划分任务划分 物理执行计划中的每个节点最终都会映射到一个或多个具体的任务Task上。这些任务会被发送到Executor上执行。 DAG划分的主要目标是将物理执行计划中的节点划分为可以并行执行的任务并尽量减少跨节点数据传输的开销。 Spark使用了一种称为“窄依赖”和“宽依赖”的概念来区分不同类型的依赖关系并根据这些依赖关系来划分任务。窄依赖通常可以在单个任务内解决而宽依赖则可能需要跨多个任务进行shuffle操作。4、任务调度和执行 一旦DAG被划分为一系列任务Spark的TaskScheduler就会负责将这些任务调度到集群中的Executor上执行。 TaskScheduler会考虑Executor的资源使用情况、任务的优先级以及其他因素来制定调度决策。 Executor在接收到任务后会执行相应的代码并将结果返回给Driver。 需要注意的是虽然DAG划分是Spark内部的一个重要过程但这个过程对用户来说是透明的。用户通常只需要关注如何编写高效的Spark作业如优化数据分区、减少跨节点数据传输等而不需要直接关心DAG是如何被构建和划分的。
Spark Streaming的双流join的过程怎么做的?
Spark Streaming的双流join是指在实时数据处理场景中将来自两个不同数据流DStream的数据根据某些共有的键key进行匹配和合并的操作。下面是进行双流join的基本步骤
1、准备数据流首先你需要有两个DStream实例这两个流可以源自不同的数据源比如Kafka、Flume、socket连接等。2、定义窗口由于流处理是连续不断的为了控制join的范围通常会定义一个时间窗口。例如你可能会说“只在最近30秒内的数据中进行join”。窗口定义帮助限制了数据的范围确保join操作是在合理的时间界限内进行的。3、键值对转换为了执行join操作需要将两个DStream中的数据转换成键值对K,V的形式。这里的键K就是你想要基于其进行join的字段。4、执行Join操作Spark Streaming提供了多种类型的join操作如内连接inner join、左外连接leftOuterJoin、右外连接rightOuterJoin等。选择合适的join类型取决于你的具体需求。这个join操作是在每个定义好的窗口上独立进行的意味着它会匹配每个窗口内两个流中的对应键值对。5、处理延迟和数据不一致性在实际应用中由于网络延迟或处理时间的差异两个流的数据可能不会完全对齐。为了解决这个问题可能需要设置适当的滑动窗口大小以允许数据对齐或者采用一些补偿策略如缓存数据或设置等待时间来尽量保证数据的一致性。6、结果处理join操作完成后得到的新DStream包含了匹配后的数据你可以继续对此DStream应用其他转换或输出操作如存储到数据库、文件系统或发送到消息队列等。 通过上述步骤Spark Streaming实现了在实时数据流上的灵活且高效的数据合并功能这对于需要整合多个数据来源进行综合分析的场景非常有用。
Spark的Block管理
Spark的Block管理是其存储和计算框架中的核心组件之一主要负责管理在集群中分布式存储的数据块Block。以下是关于Spark Block管理的详细解释
1. Block的定义与重要性定义在Spark中Block是数据处理的基本单元用于存储RDD弹性分布式数据集的分区、Shuffle输出、广播变量等。重要性通过Block管理Spark能够高效地处理和共享数据提高数据处理的速度和可靠性。2. BlockManager的角色功能BlockManager是Spark中用于管理Block的模块它负责Block的存储、复制、交换、缓存和回收等操作。分布性在Spark集群中每个Executor都会启动一个BlockManager来管理该Executor上的Block。通信BlockManager通过Driver上的Master节点BlockManagerMaster进行通信Master节点负责协调所有Executor上的BlockManager。3. Block的标识符与元数据BlockId每个Block都有唯一的标识符BlockId用于在集群中定位和管理Block。BlockInfoBlockInfo类包含了Block的元数据如BlockId、存储级别StorageLevel、数据内容等。4. Block的存储与复制存储Block可以存储在内存或磁盘上对于小数据块BlockManager会优先将其存储在内存中以提高访问速度对于大数据块则会将其存储在磁盘上。复制为了保证数据的可靠性和高可用性BlockManager会自动将数据块复制到其他节点上以避免数据丢失或节点故障导致数据无法访问。复制策略可以根据具体需求进行配置。5. Block的缓存与回收缓存为了提高计算效率BlockManager支持将数据块缓存在内存中以避免频繁地从磁盘或外部存储系统中读取数据块。回收BlockManager还会定期清除一些不再使用的数据块以释放资源。6. Block的传输与序列化传输在Spark中数据块经常需要在不同的节点之间传输和共享因此需要进行序列化和反序列化。BlockManager提供了常用的序列化和反序列化方式如Java序列化、Kryo序列化等。7. 故障恢复与数据迁移故障恢复当一个节点出现故障或者网络出现问题时BlockManager会自动进行故障恢复将丢失的数据块重新复制到其他节点上。数据迁移在集群扩容或缩容时BlockManager还支持数据迁移以保证数据块的平衡分布。8. BlockManager的实现类结构BlockManager主要由几个关键类组成包括BlockManagerMaster负责管理集群中所有节点的BlockManager、BlockManager负责管理本地节点的数据块、BlockInfo表示一个数据块的信息等。API操作开发人员可以通过BlockManager提供的API来操作Block以满足不同的数据处理需求。
Spark怎么保证数据不丢失
Spark通过多种机制来保证数据处理过程中的高容错性和防止数据丢失主要机制包括但不限于以下几点
1、RDD的不可变性和血统Lineage RDD弹性分布式数据集是Spark的核心数据结构具有不可变性。这意味着创建之后的RDD不能被修改只能通过对现有RDD进行转换来生成新的RDD。 每个RDD都维护着一个血统信息记录了从原始数据到当前RDD的所有转换步骤。如果部分数据丢失Spark可以根据这些转换步骤重新计算丢失的数据从而实现数据恢复。2、Write-Ahead Log (WAL) 特别是在Spark Streaming与外部数据源如Kafka集成时使用Write-Ahead Log可以确保数据在接收后立即被记录到持久化存储如HDFS或S3中即使在处理过程中发生故障也能从日志中恢复数据防止数据丢失。3、Checkpointing检查点机制 Checkpointing允许Spark定期将计算状态信息包括DStream的元数据和累加器的值保存到可靠的存储系统如HDFS以在Driver故障时恢复状态。这对于长时间运行的流处理应用尤为重要可以恢复到最近的检查点状态继续执行。4、Direct API直接模式 Spark Streaming的直接DirectAPI特别是与Kafka集成时允许直接从Kafka读取数据并管理偏移量消除了Receiver这一潜在的故障点。结合WAL和Checkpointing可以实现端到端的数据不丢失处理。5、Receiver Recovery and Reliability 使用Reliable Receiver确保即使在Receiver失败的情况下也不会丢失数据。当Receiver重启时它能从上次成功消费的位置继续消费数据。6、事务管理 在处理需要保证数据一致性的场景时如Spark与Kafka集成时通过事务管理机制确保数据处理和偏移量提交的原子性防止数据重复消费或丢失。 这些机制共同作用确保了Spark在面对各种故障情况时能够保持数据的完整性从而提供一个高可用和可靠的数据处理环境。
Spark SQL如何使用UDF?
在Spark SQL中使用用户自定义函数UDF是一种常见做法用于扩展Spark SQL的内置函数集以便执行自定义逻辑。以下是使用不同编程语言Scala、Python、Java创建和注册UDF的基本步骤
Scala1、定义UDF: 首先你需要定义一个函数这个函数接受一个或多个列作为输入并返回一个结果。你可以使用Scala的函数语法或表达式。
import org.apache.spark.sql.functions.udf// 定义一个简单的UDF计算字符串长度
val strLengthUDF udf((input: String) input.length)
2、注册UDF: 然后使用udf.register方法将其注册到SparkSession中。
spark.udf.register(strLength, strLengthUDF)
3、在查询中使用UDF: 注册后你可以在Spark SQL查询中像使用任何其他内置函数一样使用它。
val df spark.read.text(input.txt)
df.selectExpr(strLength(value)).show()
或者在DataFrame API中直接使用
df.select(strLengthUDF($value)).show()
Python (PySpark)1、定义UDF: 使用pyspark.sql.functions.udf装饰器定义一个Python函数。
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType# 定义一个简单的UDF计算字符串长度
def str_length(input):return len(input)strLengthUDF udf(str_length, IntegerType())
2、注册和使用UDF: 在Python中UDF会自动注册到当前的SparkSession并可以直接在DataFrame操作中使用。
df spark.read.text(input.txt)
df.select(strLengthUDF(df[value])).show()
Java1、定义和注册UDF: 在Java中你需要创建一个继承自org.apache.spark.sql.api.java.UDF1或相应的UDF2, UDF3等取决于输入参数的数量的类并实现call方法。
import org.apache.spark.sql.api.java.*;
import static org.apache.spark.sql.functions.udf;public class StrLengthUDF extends UDF1String, Integer {Overridepublic Integer call(String input) {return input.length();}
}// 在SparkSession中注册UDF
SparkSession spark SparkSession.builder().appName(JavaUDFExample).getOrCreate();
UDF1String, Integer udf new StrLengthUDF();
spark.udf().register(strLength, udf, DataTypes.IntegerType);
2、使用UDF:
DatasetRow df spark.read().text(input.txt);
df.createOrReplaceTempView(textTable);
DatasetRow result spark.sql(SELECT strLength(value) FROM textTable);
result.show();
注意对于复杂的逻辑或需要访问外部资源的情况你可能需要考虑使用更高级的函数类型如Pandas UDF仅限Python或在Scala/Java中实现更复杂的类结构。
Spark温度二次排序
在Spark中实现温度的二次排序通常涉及到对数据集中的记录首先按照一个键例如年份进行排序然后在同一键内再按照另一个键例如温度进行排序。这种需求常见于需要在特定时间段内找出最高或最低温度等场景。下面是一个基于Scala的示例展示如何实现这样的二次排序
步骤说明1、定义自定义排序键首先需要定义一个自定义的类来表示排序键这个类需要实现Ordered接口Scala特有用于自然排序和Serializable接口以便在网络间传输。这个类应该包含所有用于排序的字段并重写compare方法来定义排序逻辑。2、加载数据使用SparkContext或SparkSession从数据源加载数据。3、转换数据将原始数据映射到包含自定义排序键和原始记录的键值对。4、排序使用sortByKey方法对转换后的数据进行排序。Spark 2.x及以后版本推荐使用Dataset的orderBy方法因为它更加灵活和高效。5、提取结果排序后去除排序用的键只保留原始记录。示例代码 假设我们有一个文本文件每行包含年份、月份和温度格式如“年 月 温度”。
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._object TemperatureSecondSort {def main(args: Array[String]): Unit {val spark SparkSession.builder().appName(TemperatureSecondSort).master(local[1]).getOrCreate()// 假设数据已加载到DataFrame中格式为(year, month, temperature)val data Seq((2023, 1, 15),(2023, 2, 18),(2023, 1, 20),(2024, 1, 16),(2024, 2, 19)).toDF(year, month, temperature)// 二次排序首先按年份升序然后在同一年份内按温度降序val sortedData data.orderBy($year.asc, $temperature.desc)// 显示排序后的结果sortedData.show()spark.stop()}
}
在这个例子中我们没有使用自定义类来实现Ordered接口因为Spark DataFrame提供了丰富的排序API可以直接通过.orderBy方法实现多列排序。这里我们先按year列升序排序然后在相同年份内部按temperature列降序排列。
引用https://www.nowcoder.com/discuss/353159520220291072
通义千问、文心一言