当前位置: 首页 > news >正文

彩票网站怎么做收银wordpress后台使用方法

彩票网站怎么做收银,wordpress后台使用方法,网站开发 只要,欧洲男女做受视频网站文章目录 前言基于任务需求进行资源请求的整体过程资源申请的生成过程详解资源申请的生成过程的简单例子资源调度算法的代码解析 申请资源以后的处理#xff1a;Executor的启动或者结束对于新启动的Container的处理对于结束的Container的处理 基于资源分配结果进行任务调度Pen… 文章目录 前言基于任务需求进行资源请求的整体过程资源申请的生成过程详解资源申请的生成过程的简单例子资源调度算法的代码解析 申请资源以后的处理Executor的启动或者结束对于新启动的Container的处理对于结束的Container的处理 基于资源分配结果进行任务调度PendingTask的生成:TaskLocation的构造过程根据TaskLocation信息将Task添加到不同的pendingTask数组中 可用的LocationLevel的计算基于Locality的Task调度 结语 前言 在TODO这篇文章中介绍了Spark RPC通信的基本流程。可以看到Spark中的Driver通过Stage调度生成了物理执行计划这个物理执行计划包括了所有需要运行的Task以及最关键的这些Task希望运行的节点信息我们叫做Locality Preference即本地性偏好。 但是在Yarn的场景下资源是以Executor(Container)为单位进行调度整个Container的粒度与Task不一一对应Container的生命周期与Task不一一对应。在这种场景下动态资源调度的基本任务就是 根据这些Task以及Task的本地性偏好向Yarn申请Container作为Executor以执行Task这里最重要的是在资源申请中表达出对应的Task的位置偏好。在申请完资源以后根据Task的本地性偏好将Task调度到申请到的资源上面去。这里最重要的是尽量满足Task的本地性偏好。 本文将详细讲解这个过程。 对于资源申请算法的基本流程以及将Task和资源进行匹配的基本流程本文都用实际例子进行讲解。 基于任务需求进行资源请求的整体过程 向Yarn请求资源是由客户端向ApplicationMaster申请然后ApplicationMaster向Yarn发起请求的而不是客户端直接向Yarn申请的。 资源是为了服务于Task的运行Task的生成显然是Driver端负责的Driver会根据物理执行计划生成的Task信息发送给ApplicationMasterApplicationMaster根据这些Task的相关信息进行资源申请。 ApplicationMaster启动以后会有一个独立线程不断通过调用YarnAllocator.allocateResources()进行持续的资源更新(查看ApplicationMaster的launchReporterThread()方法)。这里叫资源更新而不叫资源申请因为这里的操作包括新的资源的申请旧的无用的Container的取消以及Blocklist Node的更新等多种操作。 总而言之ApplicationMaster作为客户端和Yarn的中间方其资源申请的方法allocateResource()在逻辑上的功能为 粒度转换: 将Task级别的资源请求转换为Container(Executor级别的资源请求)。这是一个游戏到粗的粒度的转换。维度转换: Driver发过来的资源请求是资源的最终全局状态而Yarn 的 API 要求的针对每一个Container进行增量请求。因此allocateResources()会将Driver发送过来资源请求的最终状态对比当前系统已经运行、分配未运行、已经发送请求但是还没有分配资源等等已经存在的状态确定一个发送给Yarn的增量请求状态。这是一个全量到增量的维度的转换。角度转换: Driver发过来的每个Task都带有各自Task的Locality而发送给Yarn的Container请求又是带有Locality需求的Container需求。这是一个从Task到Container的角度的转换。 ApplicationMaster端的allocateResources()方法的基本流程在代码YarnAllocator.allocateResources()中: ---------------------------------- YarnAllocator ------------------------------------def allocateResources(): Unit synchronized {updateResourceRequests() // 与Yarn进行资源交互val allocateResponse amClient.allocate(progressIndicator) // 从Yarn端获取资源结果包括新分配的、已经结束的等等val allocatedContainers allocateResponse.getAllocatedContainers()handleAllocatedContainers(allocatedContainers.asScala) // 处理新分配的Containerval completedContainers allocateResponse.getCompletedContainersStatuses() // 处理已经结束的ContainerprocessCompletedContainers(completedContainers.asScala)}}ApplicationMaster端的allocateResources()的基本流程如下图所示: 生成资源请求即将Driver发送过来的全量的、Task粒度的资源请求和Host偏好信息转换为对Yarn的、以Executor为粒度的资源请求updateResourceRequests()将资源请求发送给Yarn并从Yarn上获取分配结果(基于Yarn的异步调度策略这次获取的记过并非本次资源请求的分配结果)以进行后续处理val allocatedContainers allocateResponse.getAllocatedContainers() handleAllocatedContainers(allocatedContainers.asScala) val completedContainers allocateResponse.getCompletedContainersStatuses() processCompletedContainers(completedContainers.asScala)可以看到updateResourceRequests()是资源请求的核心方法它会负责同Yarn进行通信以进行资源请求。 在TODO中我们也介绍过生成资源请求其决策过程发生在方法updateResourceRequests()中。我们主要来看updateResourceRequests()方法 ---------------------------------- YarnAllocator ------------------------------------def updateResourceRequests(): Unit {// 获取已经发送给Yarn但是待分配的ContainerRequest计算待分配容器请求的数量// 这些ContainerRequest是之前通过调用amClient.addContainerRequest 发送出去的val pendingAllocate getPendingAllocateval numPendingAllocate pendingAllocate.size// 还没有发送请求的executor的数量val missing targetNumExecutors - numPendingAllocate -numExecutorsStarting.get - numExecutorsRunning.get// 还没有发送给Yarn的资源请求if (missing 0) { /*** 将待处理的container请求分为三组本地匹配列表、本地不匹配列表和非本地列表。*/val (localRequests, staleRequests, anyHostRequests) splitPendingAllocationsByLocality(hostToLocalTaskCounts, pendingAllocate)// staleRequests 的意思是ApplicationMaster已经请求了这个Container// 但是这个ContainerRequest所要求的hosts里面没有一个是在 hostToLocalTaskCounts (即task所倾向于)中的因此需要取消这个Container Request因为已经没有意义了// cancel stale requests for locations that are no longer neededstaleRequests.foreach { stale amClient.removeContainerRequest(stale)}val cancelledContainers staleRequests.size// consider the number of new containers and cancelled stale containers available// 将新的container请求以及刚刚取消的container作为available containerval availableContainers missing cancelledContainers// to maximize locality, include requests with no locality preference that can be cancelled// 在availableContainers的基础上再算上没有任何locality要求的并且还没有分配成功的containerval potentialContainers availableContainers anyHostRequests.size// LocalityPreferredContainerPlacementStrategy计算每一个Container 的Node locality和 Rack localityval containerLocalityPreferences containerPlacementStrategy.localityOfRequestedContainers(potentialContainers, numLocalityAwareTasks, hostToLocalTaskCounts,allocatedHostToContainersMap, localRequests)val newLocalityRequests new mutable.ArrayBuffer[ContainerRequest]// 遍历ContainerLocalityPreferences数组中的每一个ContainerLocalityPreferencescontainerLocalityPreferences.foreach {case ContainerLocalityPreferences(nodes, racks) if nodes ! null newLocalityRequests createContainerRequest(resource, nodes, racks)// 根据获取的locality重新创建ContainerRequest请求}// 除了有locality需求的container以外还有更多的available container需要被请求因此对这些container请求也发送出去if (availableContainers newLocalityRequests.size) {// more containers are available than needed for locality, fill in requests for any hostfor (i - 0 until (availableContainers - newLocalityRequests.size)) {newLocalityRequests createContainerRequest(resource, null, null) // 构造ContainerRequest对象}} else {val numToCancel newLocalityRequests.size - availableContainers// cancel some requests without locality preferences to schedule more local containersanyHostRequests.slice(0, numToCancel).foreach { nonLocal amClient.removeContainerRequest(nonLocal)}}} else if (numPendingAllocate 0 missing 0) {val numToCancel math.min(numPendingAllocate, -missing)val matchingRequests amClient.getMatchingRequests(RM_REQUEST_PRIORITY, ANY_HOST, resource)matchingRequests.iterator().next().asScala.take(numToCancel).foreach(amClient.removeContainerRequest)}}其基本过程为 获取当前Pending的request(已经发送给Yarn但是还没有分配Container的请求)并将这些Pending的请求按照本地性的需求进行切分。这里的基本意图是当前收到了来自Driver的全局的资源状态信息而在Yarn上还有一部分之前的资源请求还没有分配Container那么会不会这些Pending Requewt中有些Request已经不需要了(满足不了任何一个task的locality需求) val (localRequests, staleRequests, anyHostRequests) splitPendingAllocationsByLocality(hostToLocalTaskCounts, pendingAllocate)切分的过程就是查看当前在Yarn这一端pending的所有的Container的locality与我们目前需求的所有(全局)的Task的locality的交集: -------------------------------------- YarnAllocator ----------------------------------private def splitPendingAllocationsByLocality(hostToLocalTaskCount: Map[String, Int], // 每一个host到希望分配上去的task的数量pendingAllocations: Seq[ContainerRequest] // 还没有分配出去的ContainerRequest): (Seq[ContainerRequest], Seq[ContainerRequest], Seq[ContainerRequest]) {val localityMatched ArrayBuffer[ContainerRequest]()val localityUnMatched ArrayBuffer[ContainerRequest]()val localityFree ArrayBuffer[ContainerRequest]()val preferredHosts hostToLocalTaskCount.keySet// 将当前已经发送给Yarn但是还没有分配的Container的请求进行切分pendingAllocations.foreach { cr val nodes cr.getNodes // 这个 ContainerRequest 对节点的要求if (nodes null) {localityFree cr // 这个ContainerRequest对nodes没有要求那么就是对本地性没有要求的Container请求} else if (nodes.asScala.toSet.intersect(preferredHosts).nonEmpty) { // 这个Container的本地性要求和task期望分配的hosts集合有交集localityMatched cr // 把这个ContainerRequest添加到localityMatched的ContainerRequest集合中去} else { // 这个Container的本地性要求和task期望分配的hosts集合没有交集localityUnMatched cr // 把这个ContainerRequest添加到localityMatched的ContainerRequest集合中去}}// 切分结果 (localRequests, staleRequests, anyHostRequests)(localityMatched.toSeq, localityUnMatched.toSeq, localityFree.toSeq)}}切分过程的具体流程如下图所示: 切分的具体流程为 : 如果这个Pending Container没有任何的locality要求那么就是localityFree Container即其实际分配的位置有可能是当前所有tasks所希望的位置也可能不是那么这个container就是localityFree containerif (nodes null) {localityFree cr // 这个ContainerRequest对nodes没有要求那么就是对本地性没有要求的Container请求} 如果这个Pending Container有locality 要求并且这个locality的nodes与当前所有tasks有交集那么这个Pending Container就被划分为localityMatched显然这个Pending Container是不应该被取消的else if (nodes.asScala.toSet.intersect(preferredHosts).nonEmpty) { // 这个Container的本地性要求和task期望分配的hosts集合有交集localityMatched cr // 把这个ContainerRequest添加到localityMatched的ContainerRequest集合中去}如果这个Pending Container有locality要求但是这个locality中的nodes不在当前所有tasks的locality中的任何一个节点即这个Pending Container实际分配的位置不可能是任何一个task所倾向于的位置那么这个Pending Container就是localityUnMatched显然localityUnMatched container目前无法放置任何一个task需要取消掉else { // 这个Container的本地性要求和task期望分配的hosts集合没有交集localityUnMatched cr // 把这个ContainerRequest添加到localityMatched的ContainerRequest集合中去}对于localityUnmatched container向Yarn发送请求取消这种Container这些被取消的Container在后面会重新申请以便在申请的资源总量不变的情况下增强资源的本地特性: staleRequests.foreach { stale amClient.removeContainerRequest(stale)}计算总的Container的数量包括 Pending Container中刚刚cancel的container的数量这些Container刚刚取消了我们可以再次申请这些Container但是肯定会增强这些新的资源请求的locality以最大化我们的Task的localityPending Container中的locality free的Container数量这些Container可能分配在集群中的任何地方新增(missing)的Container请求即当前的总的container请求中除去正在运行(已经有task在运行numExecutorsRunning)和正在启动(已经分配但是还没分配tasknumExecutorsStarting)的再除去所有的pending的container(numPendingAllocate是从Yarn的API中获取的数量已经请求但是还没有分配成功的资源)多出来的Container: // 还没有发送请求的executor的数量val missing targetNumExecutors - numPendingAllocate -numExecutorsStarting.get - numExecutorsRunning.get .....// consider the number of new containers and cancelled stale containers available// 将新的container请求以及刚刚取消的container作为available containerval availableContainers missing cancelledContainers// to maximize locality, include requests with no locality preference that can be cancelled// 在availableContainers的基础上再算上没有任何locality要求的并且还没有分配成功的containerval potentialContainers availableContainers anyHostRequests.size在这里val availableContainers missing cancelledContainers即available container代表这次可以增量申请的最大的container数量包括了这次的额外需求以及刚刚取消的container(取消的container可以重新申请) 构建Container请求。这里会根据LocalityPreferredContainerPlacementStrategy的localityOfRequestedContainers来构建Container请求返回Array[ContainerLocalityPreferences]每一个ContainerLocalityPreferences代表了一个带有对应host和rack信息的Container请求: val containerLocalityPreferences containerPlacementStrategy.localityOfRequestedContainers(potentialContainers, numLocalityAwareTasks, hostToLocalTaskCounts,allocatedHostToContainersMap, localRequests)根据ContainerLocalityPreferences转换成Yarn的 ContainerRequest containerLocalityPreferences.foreach {case ContainerLocalityPreferences(nodes, racks) if nodes ! null newLocalityRequests createContainerRequest(resource, nodes, racks)// 根据获取的locality重新创建ContainerRequest请求case _ }如果可以申请的Container(available container)的数量大于刚刚计算完locality的Container数量那么为了将申请配额用尽就再申请其相差的部分Container 保证申请的Container的数量不小于Available Container的数量。 if (availableContainers newLocalityRequests.size) {// more containers are available than needed for locality, fill in requests for any hostfor (i - 0 until (availableContainers - newLocalityRequests.size)) {newLocalityRequests createContainerRequest(resource, null, null) // 构造ContainerRequest对象}}如果可以申请的Container(available container)的数量小于刚刚计算完locality的Container数量那么需要取消一部分container: else {val numToCancel newLocalityRequests.size - availableContainers// cancel some requests without locality preferences to schedule more local containersanyHostRequests.slice(0, numToCancel).foreach { nonLocal amClient.removeContainerRequest(nonLocal)}调用Yarn的标准接口addContainerRequest()将ContainerRequest发送给Yarn(其实这个接口并不会真正将请求发送出去只会存放在RMAMClient端真正发送是通过allocate()接口) newLocalityRequests.foreach { request amClient.addContainerRequest(request) } // 在这里发送container的请求从日志来看资源请求已经发出来了Yarn已经处理了所以从上面可以看到最关键的方法是LocalityPreferredContainerPlacementStrategy.localityOfRequestedContainers()方法它根据当前的已有信息(总共的Container需求有locality需求的task的数量这些locality分布在每一个task上的数量等)生成一个Array[ContainerLocalityPreferences]数组数组中的每一个元素代表了一个Container的需求并包含了其locality的要求信息然后基于生成的ContainerLocalityPreferences经过转换成ContainerRequest发送给Yarn。 资源申请的生成过程详解 资源申请的生成就是根据当前集群运行的基本情况Task的基本需求生成Yarn上的资源请求的过程。 资源申请的生成过程的简单例子 在了解其具体实现以前我们以具体例子的方式看一下localityOfRequestedContainers()方法的基本实现逻辑从而对其动机和达成的效果有一个很好的理解然后我们再看其实现细节。 从任务调度去看看到的是Task以及每个 Task的Locality倾向。比如现在我们一共需要为30个Task分配资源其中20个Task的locality倾向为Host1,Host2,Host310个Task的Locality倾向为Host1, Host2, Host4 因此对应到每个Host上的Task权重如下表所示 Host 1Host 2Host 3Host 420 Tasks20202010 Tasks101010Sum of Tasks30302010即20个Task希望分配在Host1,Host2,Host3中的任何一个有10个Task希望分配在Host1, Host2, Host4中的任何一个。如上表所示综合来看所有Task在四台机器上分配的权重是(30, 30, 20,10)。 假设一个Task需要的vCore是1而一个Container(Executor)有2个vCore因此转换成Container以后的结果如下表所示 Host 1Host 2Host 3Host 420 Tasks20202010 Tasks101010Sum of Tasks30302010Sum of Containers1515105上面的Sum of Containers的数字只是表示一个比例值并不表示对应的Host上实际需要申请的Container的数量我们实际需要的总的Container数量才15个。那么这15个Container需求平均到每台Host上是多少呢 比如Host 1的Sum of Container 为15 所有Host的Sum of Container 是45因此占比是1/3所以平均下来分配到Host1上的Container数量应该是 15 * 1/3 5。经过向上取整(宁可稍微多分配也不要少分配)以后每台机器所平均到的15个Container需求是: Host 1Host 2Host 3Host 420 Tasks20202010 Tasks101010Sum of Tasks30302010Sum of Containers1515105Allocated Container Target5542 在这里计算完总的Allocated Container Target以后需要减去当前已经在该Host上已经存在(正在运行或者在这个Host上pending的Container)因为我们最终发送给Yarn的Container请求是增量请求。假设现在在每一个Host上已经存在的Container数量都是1即15个Container中有4个Container是已经分配的那么减去已经存在的Container数量以后的结果如下表所示所以我们需要新申请12个Container: Host 1Host 2Host 3Host 420 Tasks20202010 Tasks101010Sum of Tasks30302010Sum of Containers1515126Allocated Container Target5542Newly Allocated Container4431 将每个Host的Newly Allocated Container按照比例进行缩放保证比例最大的那个Host(这里是Host1 和 Host 2)的比例值是需要新申请的Container的数量。在这里扩大因子应该是 12(Container的总数量)/4(比例最大的Host的Average Allocated Container) 3 : Host 1Host 2Host 3Host 420 Tasks20202010 Tasks101010Sum of Tasks30302010Sum of Containers1515105Allocated Container Target5542Newly Allocated Container4431Round Up121293 开始发起资源请求。每一个Container请求的Locality中包含的Host如下表所示 Host 1Host 2Host 3Host 33 Containers✔✔✔✔6 Containers✔✔✔3 Containers✔✔其含义是 3个Container请求的Locality是[Host1, Host2, Host3, Host4]即请求Yarn分配3个Container并且尽量将它们分配在这4个Hosts中。此时剩下的Host比例为[9:9:6:0]6个Container请求的Locality是[Host1, Host2, Host3]即请求Yarn分配6个Container并且尽量将它们分配在这3个Hosts中此时剩下的Host中container比例为[3:3:0]3个Container请求的Locality是[Host1, Host2]即请求Yarn分配3个Container并且尽量将它们分配在这2个Hosts中 这样所有Host的Container比例就是12:12:9:3平均到12个需分配的Container以后的比例是4:4:3:1再加上已经分配在每个host上的1个Container那么总的Container在每个Host上的比例就是5:5:4:2这个比例和我们直接根据每个Host的task比例折算成的Container的比例15:15:10:5是大致相近的。 到了这里我们可以理解了为什么我们需要在 步骤5 做Round Up操作并且Round Up的目标是将目前比例值最大的Host的比例值扩大为当前Container需求的最大值? 因为在步骤6中生成Container请求的时候比例值最大的Host的比例值肯定是等于需要申请的Container数量的。 资源调度算法的代码解析 上面以实际例子解释了Spark将当前的Task的Locality需求信息转换成Yarn的资源请求的细节。下面我们结合代码详细看一下localityOfRequestedContainers()方法的实现细节 def localityOfRequestedContainers(numContainer: Int, // 需要进行计算的container的数量包括missing的cancel掉的(本地性不符合任何task要求的pending container)以及对本地性没有要求的pending的containernumLocalityAwareTasks: Int, // 对locality有要求的task的数量这个是Driver端通过stageIdToExecutorPlacementHints计算然后通过RequestExecutor传递过来的hostToLocalTaskCount: Map[String, Int], // 在Stage提交了以后这个map里面保存了从host到期望分配到这个host的task的数量这个是Driver端通过stageIdToExecutorPlacementHints传递过来的allocatedHostToContainersMap: HashMap[String, Set[ContainerId]], // 已经launch起来的host - container的映射关系localityMatchedPendingAllocations: Seq[ContainerRequest] // 对本地性有要求的pending的container): Array[ContainerLocalityPreferences] {// 预期的从host到期望在上面再launch的新的container数量的映射关系val updatedHostToContainerCount expectedHostToContainerCount(numLocalityAwareTasks, hostToLocalTaskCount, allocatedHostToContainersMap,localityMatchedPendingAllocations)// 希望再launch的所有Host上的container的数量之和在这里的例子中是15val updatedLocalityAwareContainerNum updatedHostToContainerCount.values.sum// The number of containers to allocate, divided into two groups, one with preferred locality,// and the other without locality preference.// 没有locality需求的container的数量val requiredLocalityFreeContainerNum math.max(0, numContainer - updatedLocalityAwareContainerNum)// 有locality需求的container的数量val requiredLocalityAwareContainerNum numContainer - requiredLocalityFreeContainerNumval containerLocalityPreferences ArrayBuffer[ContainerLocalityPreferences]()if (requiredLocalityFreeContainerNum 0) { // 如果有container是没有locality需求的for (i - 0 until requiredLocalityFreeContainerNum) {containerLocalityPreferences ContainerLocalityPreferences( // 为这些没有locality需求的container一一创建container需求null.asInstanceOf[Array[String]], null.asInstanceOf[Array[String]])}}if (requiredLocalityAwareContainerNum 0) { // 如果有container有locality需求val largestRatio updatedHostToContainerCount.values.max // 全局的所有host中最大的container数量// Round the ratio of preferred locality to the number of locality required container// number, which is used for locality preferred host calculating.var preferredLocalityRatio updatedHostToContainerCount.map { case(k, ratio) val adjustedRatio ratio.toDouble * requiredLocalityAwareContainerNum / largestRatio(k, adjustedRatio.ceil.toInt) // 往上取整}// 每个有locality需求的的Container request为他们确定对应的hosts和rackfor (i - 0 until requiredLocalityAwareContainerNum) {// Only filter out the ratio which is larger than 0, which means the current host can// still be allocated with new container request.val hosts preferredLocalityRatio.filter(_._2 0).keys.toArray // 还有container可以分配的一个或者多个hostsval racks hosts.map { h resolver.resolve(yarnConf, h) // 解析这些host所在的rack}.toSet// 每一个ContainerLocalityPreferences代表一个ContainercontainerLocalityPreferences ContainerLocalityPreferences(hosts, racks.toArray)// Minus 1 each time when the host is used. When the current ratio is 0,// which means all the required ratio is satisfied, this host will not be allocated again.preferredLocalityRatio preferredLocalityRatio.map { case (k, v) (k, v - 1) }}}// containerLocalityPreferences中的每一项都会变成一个新的Container RequestcontainerLocalityPreferences.toArray}其参数的基本含义是 numContainer: Int 需要进行计算的Container的数量即可能进行分配的Container数量包括Miss的container(还没有申请的Container)Cancel掉的(本地性不符合任何task要求因此已经从Yarn上取消的pending container)。同时还包括Pending Container中对本地性没有要求的Container这一部分Container也是我们重新申请的对象以最大化Locality。上文讲到过的updateResourceRequests()方法中的potentialContainers就是传入到该方法的numContainers参数 // 将新的container请求以及刚刚取消的container作为available containerval availableContainers missing cancelledContainers// to maximize locality, include requests with no locality preference that can be cancelled// 在availableContainers的基础上再算上没有任何locality要求的并且还没有分配成功的containerval potentialContainers availableContainers anyHostRequests.sizenumLocalityAwareTasks: Int 对locality有要求的task的数量这个是Driver端通过对stageIdToExecutorPlacementHints计算然后通过RequestExecutor传递过来的数值。已经说过这是此时的全局状态量而不是一个增量 hostToLocalTaskCount: Map[String, Int] 在Stage提交了以后这个map里面保存了从host到期望分配到这个host的task的数量这个是Driver端通过stageIdToExecutorPlacementHints传递过来的具体过程是 在Driver端ExecutorAllocationManager的onStageSubmitted回调中会将这个Stage的task preference存放在stageIdToExecutorPlacementHints中。 ----------------------------------------- ExecutorAllocationManager ---------------------------------------- override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit {.....// 计算这个stage在每一个host上的task数量// Compute the number of tasks requested by the stage on each hostvar numTasksPending 0val hostToLocalTaskCountPerStage new mutable.HashMap[String, Int]()stageSubmitted.stageInfo.taskLocalityPreferences.foreach { locality // 对于每一个task的prefered location的listnumTasksPending 1// 对于这个task的每一个 preferred locationlocality.foreach { location // 对于这个locality中的每一个location// 这个host上的task的数量1val count hostToLocalTaskCountPerStage.getOrElse(location.host, 0) 1hostToLocalTaskCountPerStage(location.host) count}}// 这个map的key是stage idvalue是一个元组记录了这个stage的pending的task的数量以及从host到task count的map信息stageIdToExecutorPlacementHints.put(stageId,(numTasksPending, hostToLocalTaskCountPerStage.toMap))updateExecutorPlacementHints() 随后ExecutorAllocationManager会有线程不断将这些信息通过RequestExecutors发送给远程的ApplicationMaster def start(): Unit {listenerBus.addToManagementQueue(listener)val scheduleTask new Runnable() {override def run(): Unit {schedule() // 这里会根据需要更新numExecutorsTarget的数量也会调用}}executor.scheduleWithFixedDelay(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)}allocatedHostToContainersMap: HashMap[String, Set[ContainerId]] 已经launch起来的host - container的映射关系。这是updateResource()方法每次通过Yarn的标准API allocate()向Yarn询问以后获取的结果。我们说过allocate()接口用来向Yarn发送本次的资源请求并返回当前Yarn为这个Application分配的Container的结果。由于Yarn端的资源分配是异步分配因此allocate()返回的结果并非是这次请求的资源的分配结果而是两次相邻的allocate()请求发生之间的新产生的资源分配结果 localityMatchedPendingAllocations: Seq[ContainerRequest] 对本地性有要求的pending的container其在方法splitPendingAllocationsByLocality()中对Pending的Container的Locality状态进行切分后那些与当前请求的Task的Locality有交集的Pending Container将作为已经存在的Container整个资源请求的目标是使得新申请的Container和已经分配的Container加起来其资源倾向和所有Task的统计倾向尽量匹配从而最大程度满足Task的本地性需求。 localityOfRequestedContainers()算法的基本过程为 计算每一个Host上应该新分配的Container的数量的预期值。由于是新分配的Container的预期值因此需要先根据每个Host上的预期存在的Container的总的数量减去该Host上已经存在的Container val updatedHostToContainerCount expectedHostToContainerCount(numLocalityAwareTasks, hostToLocalTaskCount, allocatedHostToContainersMap,localityMatchedPendingAllocations)这里的计算就是完成下表中从Sum of Tasks(每个机器上分配到的Task的比例) 到 Sum of Containers (每个机器上分配的Container的比例)的转换然后根据Sum of Containers 减去每台机器上已经分配的Container就得到了Average Allocated Container Total(每台机器上应该新分配的Container的数量) Host 1Host 2Host 3Host 420 Tasks20202010 Tasks101010Sum of Tasks30302010Sum of Containers1515126Allocated Container Target5542Newly-Allocated Container4431 根据上面计算的分配结果统计没有locality需求的Container的总数量和有locality需求的Container数量 // 希望再launch的所有Host上的container的数量之和在这里的例子中是15 val updatedLocalityAwareContainerNum updatedHostToContainerCount.values.sum// The number of containers to allocate, divided into two groups, one with preferred locality, // and the other without locality preference. // 没有locality需求的container的数量 val requiredLocalityFreeContainerNum math.max(0, numContainer - updatedLocalityAwareContainerNum)// 有locality需求的container的数量 val requiredLocalityAwareContainerNum numContainer - requiredLocalityFreeContainerNum先为 没有locality需求的Container 构造ContainerLocalityPreferences每一个ContainerLocalityPreferences对象对应了一个Container请求和这个请求的Locality需求。可以看到这种没有Locality需求的Container的Host 偏好和Rack 偏好都是空的 val containerLocalityPreferences ArrayBuffer[ContainerLocalityPreferences]() if (requiredLocalityFreeContainerNum 0) { // 如果有container是没有locality需求的for (i - 0 until requiredLocalityFreeContainerNum) {containerLocalityPreferences ContainerLocalityPreferences( // 为这些没有locality需求的container一一创建container需求null.asInstanceOf[Array[String]], null.asInstanceOf[Array[String]])} }为 有locality需求的Container 构造ContainerLocalityPreferences对象每一个ContainerLocalityPreferences对象封装了这个Container请求和这个请求的Locality需求 4.1 这里需要完成比较难以理解的Container请求的比例放大保证比例最大的那个Host(这里是Host1 和 Host 2)的比例值是需要申请的Container的数量以满足随后为每一个Container构造其Locality信息的过程 var preferredLocalityRatio updatedHostToContainerCount.map { case(k, ratio) val adjustedRatio ratio.toDouble * requiredLocalityAwareContainerNum / largestRatio(k, adjustedRatio.ceil.toInt) // 往上取整}如下图所示这里就是完成Round Up 这一步骤将需要新分配的Container数量成比例放大保证Container比例最大的Host这里是Host1 和 Host 2放大以后的值刚好等于需要分配的有Locality Preference 的 Container的总数量。 Host 1Host 2Host 3Host 420 Tasks20202010 Tasks101010Sum of Tasks30302010Sum of Containers1515105Allocated Container Target5542Newly Allocated Container4431Round Up1212934.2 放大完成以后开始进行分配。 当前有12 个Container需要分配每一个Host的分配比例为(12,12,9,3)。分配过程上文已经经过其代码如下 // 每个有locality需求的的Container request为他们确定对应的hosts和rackfor (i - 0 until requiredLocalityAwareContainerNum) {// Only filter out the ratio which is larger than 0, which means the current host can// still be allocated with new container request.val hosts preferredLocalityRatio.filter(_._2 0).keys.toArray // 还有container可以分配的一个或者多个hostsval racks hosts.map { h resolver.resolve(yarnConf, h) // 解析这些host所在的rack}.toSet// 每一个ContainerLocalityPreferences代表一个ContainercontainerLocalityPreferences ContainerLocalityPreferences(hosts, racks.toArray)// Minus 1 each time when the host is used. When the current ratio is 0,// which means all the required ratio is satisfied, this host will not be allocated again.preferredLocalityRatio preferredLocalityRatio.map { case (k, v) (k, v - 1) }}申请资源以后的处理Executor的启动或者结束 上面讲过资源调度的入口方法allocateResources()会通过updateResourceRequests()来计算所需资源并向Yarn进行资源的更新包括申请新的资源、释放无用的资源等 def allocateResources(): Unit synchronized {updateResourceRequests() // val allocateResponse amClient.allocate(progressIndicator)val allocatedContainers allocateResponse.getAllocatedContainers()handleAllocatedContainers(allocatedContainers.asScala)val completedContainers allocateResponse.getCompletedContainersStatuses()processCompletedContainers(completedContainers.asScala)}}通过调用Yarn的标准API allocate()获取了资源分配的结果。再次强调Yarn这一端的资源调度是异步调度因此这个资源分配的结果并不是刚刚通过addContainerRequest()进行资源申请的结果只是调用者在两次调用allocate() API的之间Yarn对于这个Application的新的资源分配结果。拿到了分配的ContainerSpark就可以将Executor启动起来了(注意是启动一个空的Executor不是启动Task)。启动起来的Executor随后就会向DriverEndpoint注册自己通信的详细过程参考TODO。这里不再赘述。 对分配结果的处理主要是处理已经分配的Container以及已经运行结束的Container: val allocatedContainers allocateResponse.getAllocatedContainers()handleAllocatedContainers(allocatedContainers.asScala)val completedContainers allocateResponse.getCompletedContainersStatuses()processCompletedContainers(completedContainers.asScala)对于已经分配的Container需要从Yarn的AMRMClient中将对应的资源请求删除避免对同一个资源进行多次重复申请然后启动对应的Executor。对于已经完成的Container需要根据Container的退出状态记录相关日志同时需要向Driver发送RemoveExecutor消息告知Driver这个Container的结束Driver端会进行相关状态的维护。 对于新启动的Container的处理 对于一个刚刚分配成功的Container其处理工作主要包括两个 一是从AMRMClient中将对应的资源请求删除避免同一资源请求的Container被重复申请然后在远程的NodeManager节点上启动Container。 这些过程在方法handleAllocatedContainers()中进行 --------------------------------- YarnAllocator -------------------------------------- def handleAllocatedContainers(allocatedContainers: Seq[Container]): Unit {val containersToUse new ArrayBuffer[Container](allocatedContainers.size)// 先处理Host Match的Containerval remainingAfterHostMatches new ArrayBuffer[Container]for (allocatedContainer - allocatedContainers) {matchContainerToRequest(allocatedContainer, allocatedContainer.getNodeId.getHost,containersToUse, remainingAfterHostMatches)}// 处理Host Match以后剩余的Containerval remainingAfterRackMatches new ArrayBuffer[Container]for (allocatedContainer - remainingAfterHostMatches) {val rack resolver.resolve(conf, allocatedContainer.getNodeId.getHost)matchContainerToRequest(allocatedContainer, rack, containersToUse,remainingAfterRackMatches)}// 处理Host Match和Rack Match以后剩余的Containerval remainingAfterOffRackMatches new ArrayBuffer[Container]for (allocatedContainer - remainingAfterRackMatches) {matchContainerToRequest(allocatedContainer, ANY_HOST, containersToUse,remainingAfterOffRackMatches)}// 在Host MatchRack Match以及ANY_HOST Match以后依然还有剩余的Container这只能是Bugif (!remainingAfterOffRackMatches.isEmpty) {for (container - remainingAfterOffRackMatches) {internalReleaseContainer(container)}}/*** 在这里会打印 Launching container container_1714042499037_5294_01_000002 on host*/runAllocatedContainers(containersToUse)}遍历每一个分配的Container在AMRMClient端找到跟这个Container所在的机器相匹配的资源请求将这个资源请求AMRMClient中删除。这个删除操作并不会和远程的ResourceManager通信因为这些资源请求都通过addContainerRequest() API被AMRMClient保存在本地然后通过allocate() API发送给远程的RM的。因此对应请求的删除是在AMRMClient的本地进行的:val remainingAfterHostMatches new ArrayBuffer[Container] for (allocatedContainer - allocatedContainers) {matchContainerToRequest(allocatedContainer, allocatedContainer.getNodeId.getHost,containersToUse, remainingAfterHostMatches) }其中matchContainerToRequest()方法就是根据这个分配成功的Container的特性(PriorityVCore CPUHost Location)从AMRMClient中删除对应的ResourceRequest。未删除的Container保存在参数remainingAfterHostMatches中--------------------------------- YarnAllocator --------------------------------------private def matchContainerToRequest(allocatedContainer: Container,location: String,containersToUse: ArrayBuffer[Container],remaining: ArrayBuffer[Container]): Unit {// 这个Container的资源特性val matchingResource Resource.newInstance(allocatedContainer.getResource.getMemory,resource.getVirtualCores)// 以PriorityResource(VCore, Memory)location作为ID删除这个Container对应的资源请求val matchingRequests amClient.getMatchingRequests(allocatedContainer.getPriority, location,matchingResource)if (!matchingRequests.isEmpty) { // 匹配成功val containerRequest matchingRequests.get(0).iterator.nextamClient.removeContainerRequest(containerRequest) // 从AMRMClient中删除containersToUse allocatedContainer} else {remaining allocatedContainer // 未匹配的Container放入remaining接着进行其他匹配}}遍历剩下的在Host级别没有匹配成功的剩余的Container在Rack级别进行Container和ResourceRequest 的匹配并将匹配不成功的Container保存在remainingAfterRackMatches中val remainingAfterRackMatches new ArrayBuffer[Container] for (allocatedContainer - remainingAfterHostMatches) {/*** SparkRackResolver.*/val rack resolver.resolve(conf, allocatedContainer.getNodeId.getHost)matchContainerToRequest(allocatedContainer, rack, containersToUse,remainingAfterRackMatches) }遍历剩下的在Rack级别没有匹配成功的剩余的Container在ANY_HOST(忽略本地偏好)级别进行Container和ResourceRequest 的匹配(这种ResourceRequest是那种没有Locality需求的ResourceRequest)并将匹配不成功的Container保存在remainingAfterRackMatches// Assign remaining that are neither node-local nor rack-local val remainingAfterOffRackMatches new ArrayBuffer[Container] for (allocatedContainer - remainingAfterRackMatches) {matchContainerToRequest(allocatedContainer, ANY_HOST, containersToUse,remainingAfterOffRackMatches) }如果有的已经分配的Container无论是Host、Rack还是ANY_HOST偏好都没有在AMRMClient本地找到他们匹配的资源请求(注释中说这是由于Yarn本身的竞争导致的bug)那么释放这些Container:if (!remainingAfterOffRackMatches.isEmpty) {for (container - remainingAfterOffRackMatches) {internalReleaseContainer(container)} }资源请求释放完毕以后通过方法runAllocatedContainers()逐个启动每一个Container。---------------------------------- YarnAllocator ----------------------------------------private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]): Unit {for (container - containersToUse) { // 对于已经allocate并且资源已经匹配的containerexecutorIdCounter 1val executorHostname container.getNodeId.getHostval containerId container.getIdval executorId executorIdCounter.toString // 分配executorIddef updateInternalState(): Unit synchronized {numExecutorsRunning.incrementAndGet()numExecutorsStarting.decrementAndGet()executorIdToContainer(executorId) container // executor 和 container的映射关系containerIdToExecutorId(container.getId) executorId // container 和 executor 的映射关系/*** Container launch起来以后更新allocatedHostToContainersMap*/val containerSet allocatedHostToContainersMap.getOrElseUpdate(executorHostname,new HashSet[ContainerId])containerSet containerIdallocatedContainerToHostMap.put(containerId, executorHostname)}numExecutorsStarting.incrementAndGet()launcherPool.execute(new Runnable {override def run(): Unit {try {new ExecutorRunnable(........).run() // 运行ExecutorRunnable用来和NodeManager通信来启动ContainerupdateInternalState()}})}}5.1 在线程池中启动一个ExecutorRunnable。ExecutorRunnable会负责和NodeManager进行通信在对应节点上将Container启动起来launcherPool.execute(new Runnable {override def run(): Unit {try {new ExecutorRunnable(Some(container),conf,sparkConf,driverUrl,executorId,executorHostname,executorMemory,executorCores,appAttemptId.getApplicationId.toString,securityMgr,localResources).run()5.2 在ExecutorRunnable启动以后由于新的Container的加入更新相关元数据信息包括executor - containercontainer - executor container - host host - container的映射关系这是通过内部方法updateInternalState()来负责的---------------------------------- YarnAllocator ----------------------------------------def updateInternalState(): Unit synchronized {numExecutorsRunning.incrementAndGet()numExecutorsStarting.decrementAndGet()executorIdToContainer(executorId) container // executor 和 container的映射关系containerIdToExecutorId(container.getId) executorId // container 和 executor 的映射关系/*** Container launch起来以后更新allocatedHostToContainersMap*/val containerSet allocatedHostToContainersMap.getOrElseUpdate(executorHostname,new HashSet[ContainerId])containerSet containerIdallocatedContainerToHostMap.put(containerId, executorHostname)}对于结束的Container的处理 对于结束的Container的处理在方法processCompletedContainers()中进行 ---------------------------------- YarnAllocator ---------------------------------------- private[yarn] def processCompletedContainers(completedContainers: Seq[ContainerStatus]): Unit {for (completedContainer - completedContainers) {val containerId completedContainer.getContainerIdval alreadyReleased releasedContainers.remove(containerId) // alreadyReleased记录这个Container是否已经被释放val hostOpt allocatedContainerToHostMap.get(containerId)val onHostStr hostOpt.map(host s on host: $host).getOrElse()val exitReason if (!alreadyReleased) { // 这个Container还没有释放那么走释放流程// Decrement the number of executors running. The next iteration of// the ApplicationMasters reporting thread will take care of allocating.numExecutorsRunning.decrementAndGet()// Hadoop 2.2.X added a ContainerExitStatus we should switch to use// there are some exit status we shouldnt necessarily count against us, but for// now I think its ok as none of the containers are expected to exit.val exitStatus completedContainer.getExitStatusval (exitCausedByApp, containerExitReason) exitStatus match {case ContainerExitStatus.SUCCESS .....case ContainerExitStatus.PREEMPTED ....case VMEM_EXCEEDED_EXIT_CODE ....case PMEM_EXCEEDED_EXIT_CODE ....}if (exitCausedByApp) {logWarning(containerExitReason)} else {logInfo(containerExitReason)}ExecutorExited(exitStatus, exitCausedByApp, containerExitReason)} else {// 如果我们释放了这个Container那么说明一定是Driver直接通过 killExecutor// 释放掉了这个Container而不是它自行结束ExecutorExited(completedContainer.getExitStatus, exitCausedByApp false,sContainer $containerId exited from explicit termination request.)}// 解除host - container 以及 container - host mappingfor {host - hostOpt // 这个Container对应的HostcontainerSet - allocatedHostToContainersMap.get(host) // 这个Container对应的Host上的所有container} {containerSet.remove(containerId) // 删除这个containerif (containerSet.isEmpty) { // 这个container是这个host上的最后一个containerallocatedHostToContainersMap.remove(host) // 删除host} else {allocatedHostToContainersMap.update(host, containerSet)}allocatedContainerToHostMap.remove(containerId) // 解除container - host map}// 解除container - executor mappingcontainerIdToExecutorId.remove(containerId).foreach { eid executorIdToContainer.remove(eid)....if (!alreadyReleased) {// The executor could have gone away (like no route to host, node failure, etc)// Notify backend about the failure of the executornumUnexpectedContainerRelease 1driverRef.send(RemoveExecutor(eid, exitReason))}}} }可以看到方法processCompletedContainers()会遍历Yarn返回的每一个Completed(注意Completed只是代表Container运行结束但是运行结果可能是Succeed可能是Fail)然后逐个处理 如果Container此时并没有被释放说明Container是自行结束而不是Driver所杀死的。根据Container的退出状态和退出原因打印日志 val alreadyReleased releasedContainers.remove(containerId) // alreadyReleased记录这个Container是否已经被释放 val exitReason if (!alreadyReleased) {val alreadyReleased releasedContainers.remove(containerId) // alreadyReleased记录这个Container是否已经被释放val exitStatus completedContainer.getExitStatusval (exitCausedByApp, containerExitReason) exitStatus match {case ContainerExitStatus.SUCCESS .....case ContainerExitStatus.PREEMPTED ....case VMEM_EXCEEDED_EXIT_CODE ....case PMEM_EXCEEDED_EXIT_CODE ....}if (exitCausedByApp) {logWarning(containerExitReason)} else {logInfo(containerExitReason)}}如果我们发现这个Container已经在ReleasedContainer中存在说明只能是Driver通过KillExecutor的方式将Container给Release了而不是Container自行退出 // 如果我们释放了这个Container那么说明一定是Driver直接通过 killExecutor// 释放掉了这个Container而不是它自行结束ExecutorExited(completedContainer.getExitStatus, exitCausedByApp false,sContainer $containerId exited from explicit termination request.)如果是Driver杀死了ExecutorDriver会向AMEndpoint发送KillExecutor消息AMEndpoint会将这个Executor从其维护的元数据信息中删除将这个Kill掉的Executor的Container添加到releasedContainers中同时通过AMRMClient向Yarn发送释放container的请求 def killExecutor(executorId: String): Unit synchronized {val container executorIdToContainer.get(executorId).getinternalReleaseContainer(container)numExecutorsRunning.decrementAndGet()}private def internalReleaseContainer(container: Container): Unit {releasedContainers.add(container.getId()) // 将这个Container从releasedContainer中删除amClient.releaseAssignedContainer(container.getId()) // 向Yarn发送释放Container的请求}删除这个Container和Host之间的映射关系包括Host - Container的映射关系以及反向的Container- Host的映射关系 for {host - hostOpt // 这个Container所在的HostscontainerSet - allocatedHostToContainersMap.get(host) // 这个Host上的所有Container} {containerSet.remove(containerId)if (containerSet.isEmpty) {allocatedHostToContainersMap.remove(host)} else {allocatedHostToContainersMap.update(host, containerSet)}allocatedContainerToHostMap.remove(containerId)}删除Container和Executor之间的映射关系同时如果不是Driver主动release的这个container那么会向Driver发送RemoveExecutor消息 containerIdToExecutorId.remove(containerId).foreach { eid executorIdToContainer.remove(eid)........if (!alreadyReleased) {// 这个Container不是Driver自行释放的那么需要像Driver汇报一个RemoveExecutor消息driverRef.send(RemoveExecutor(eid, exitReason))}基于资源分配结果进行任务调度 上面讲到对于新分配的Container的处理在收到Yarn返回的分配的Container以后ApplicationMaster会启动对应的Executor。这些Executor启动以后会向Driver注册自己以告知Driver自己的存在Driver进而将Task调度到Executor中。 其实Task的调度的触发不仅仅是新分配了Container或者新Launch了Executor基本上在集群的资源可能发生变化的情况下都会触发Task的调度因此Task的调度是一个不断将Pending Task与可用资源进行匹配然后调度出去的过程。 我列举了下面四种可以触发Driver端的CoarseGrainedSchedulerBackend通过运行makeOffers(或者只针对某一个Executor的makeOffer)来进行任务调度 来自Executor的注册上面说过Executor启动以后会向Driver注册自己。此时Driver认为集群中有了新的可用资源因此尝试进行Task到Executor的调度。Executor的注册是通过Executor启动的时候向DriverEndpoint发送RegisterExecutor消息来触发的。 Driver将Executor能够提供的可用资源(Memory , CPU)叫做Resource Offer。因此当收到了新的Executor的注册Driver端会调用makeOffers()方法为这个Executor生成对应的WorkerOffer代表这个Executor剩余可用的CPU和Memory: ------------------------------------ DriverEndpoint ------------------------------- override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] {// 收到 RegisterExecutor 请求这个请求发生在Executor启动以后向Driver发送的信息case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) ......executorRef.send(RegisteredExecutor).....makeOffers() // 尝试进行task的调度针对所有可用的executor}来自Executor的StatusUpdate在Executor上Task的运行状态发生变化都会告知DriverDriver认为此时集群的资源状态发生了变化因此尝试进行一次task的调度。但是这次的Task的调度是针对这个Executor的即只会调度适合运行在这个Executor上的Pending Task到这个Exeuctor上 ------------------------------------ DriverEndpoint ----------------------------------- override def receive: PartialFunction[Any, Unit] {case StatusUpdate(executorId, taskId, state, data) scheduler.statusUpdate(taskId, state, data.value)if (TaskState.isFinished(state)) {executorDataMap.get(executorId) match {case Some(executorInfo) executorInfo.freeCores scheduler.CPUS_PER_TASKmakeOffers(executorId) // 尝试进行Task的调度但是只针对这一个Executor...}来自TaskScheduler中的任务的相关变化比如Task的提交Executor的丢失Task的失败推测执行等等由于系统的可用资源发生了变化因此TaskScheduler都会通过向DriverEndpoint发送ReviveOffers消息以触发新一轮的Pending Task的调度。 ----------------------------------- DriverEndpoint --------------------------------override def receive: PartialFunction[Any, Unit] {.....case ReviveOffers makeOffers()来自SchedulerBackend的自我触发在DriverEndpoint作为一个RpcEndpoint启动的时候会启动一个ReviveThread以固定频率向自己发送ReviveOffers的本地消息(发送给自己)以触发Pending 的 Task到Container的调度 -------------------------------- DriverEndpoint ---------------------------------------override def onStart() {// 定期恢复offer以允许延迟调度工作// Periodically revive offers to allow delay scheduling to workval reviveIntervalMs conf.getTimeAsMs(spark.scheduler.revive.interval, 1s)reviveThread.scheduleAtFixedRate(new Runnable {override def run(): Unit Utils.tryLogNonFatalError {Option(self).foreach(_.send(ReviveOffers))}}, 0, reviveIntervalMs, TimeUnit.MILLISECONDS)}在以Locality为考虑重点的Task的调度就是根据locality从高到低(executor-local 优先级最高)参照当前允许的优先级取出对应的Task进行调度。如果当前优先级的Task已经调度完毕或者当前locality的一部分或者全部Task经过了很久还没有完成调度(即当前的系统资源无法完全满足当前的locality需求)那么就需要降低locality再次尝试进行调度。 下文会详细讲解makeOffers()的具体流程。 PendingTask的生成: Task的调度的重要逻辑是满足Task 的Location Preference 即每一个Task对运行位置 (Executor, Host, Rack等等)上的偏好。我们先看看TaskLocation的含义和生成过程然后看看Spark的Driver是怎样通过Locality-Aware的调度方式尽最大可能满足每一个Task的本地性需求。 TaskLocation的构造过程 在Spark中一个Task的Location归根结底是由这个Task的Split决定的Split代表了一个切分比如对一个HDFS文件的切分。在Hadoop上一个Split表示为一个InputSplit接口的实现类的一个实例。 一个HDFS文件的一个Split是由FileSplit对象表达对象中hostInfos存放了一个SplitLocationInfo数组每一个SplitLocationInfo对象存放了一个这个Split的一个Replica的Location信息(因为在HDFS上文件是多副本的)包括具体的Hostname(即DataNode所在的节点)以及这个Host是否在内存中缓存了这个Split的标记位。 当然在HDFS的维度我们说一个Split被缓存其实缓存的是这个Split对应的Replica。在Hadoop中一个文件的Split对应一个FileSplit对象代码如下 --------------------------------------- FileSplit ------------------------------------------------public FileSplit(Path file, long start, long length, String[] hosts,String[] inMemoryHosts) {this(file, start, length, hosts);hostInfos new SplitLocationInfo[hosts.length];for (int i 0; i hosts.length; i) {// because N will be tiny, scanning is probably faster than a HashSetboolean inMemory false;for (String inMemoryHost : inMemoryHosts) {if (inMemoryHost.equals(hosts[i])) {inMemory true;break;}}hostInfos[i] new SplitLocationInfo(hosts[i], inMemory);}}其中一个Replica的Location信息对应了一个SplitLocationInfo对象包含了这个Split是否在对应的Host上缓存了以及对应的Host(DataNode) 信息。在多副本的环境下一个FileSplit对应的是一个数组 SplitLocationInfo[] ------------------------------------------ SplitLocationInfo ----------------------------------------- public class SplitLocationInfo {private boolean inMemory;private String location;public SplitLocationInfo(String location, boolean inMemory) {this.location location;this.inMemory inMemory;}我们看一下Spark在提交Stage和Task的过程中是怎么获取Task的Location信息的以及在获取以后是如何基于Location调度Task的。 在HDFS的场景下Spark是将每一个Stage对应的RDD(一个NewHadoopRDD对象)中的Partition(一个NewHadoopPartition对象)的Location信息(如果有)存放在这个NewHadoopPartition的serializableHadoopSplit中其实是对InputSplit的封装: --------------------------------------- NewHadoopPartition ------------------------------------ private[spark] class NewHadoopPartition(rddId: Int,val index: Int,rawSplit: InputSplit with Writable) // 这个HadoopPartition对应的InputSplitextends Partition {val serializableHadoopSplit new SerializableWritable(rawSplit)Stage以及Partition的生成是在提交以前进行的即执行计划的生成阶段形成的。那么到了Stage的提交阶段是怎么利用Stage和Partition中的Location信息生成具有Location Preference的Task信息的呢我们看看Stage提交的时候是如何使用Partition的Location信息的。 下面的这段代码是一段经典代码Spark中DagScheduler提交Stage的时候会首先检查这个Stage是否有未提交的Parent Stage 如果有会首先递归提交Parent Stage然后把当前Stage放入到waitingStages中。waitingStage指的是那些parentStage还没有完成、因此需要等待的Stage;如果没有Missing Parent Stage那么就可以提交当前的Stage /** Submits stage, but first recursively submits any missing parents. */private def submitStage(stage: Stage) {val jobId activeJobForStage(stage)if (jobId.isDefined) {if (!waitingStages(stage) !runningStages(stage) !failedStages(stage)) {val missing getMissingParentStages(stage).sortBy(_.id)if (missing.isEmpty) { // 没有missing parent stage那么提交这个Stage的所有tasksubmitMissingTasks(stage, jobId.get)} else {for (parent - missing) { // 有missing parent stage那么先提交submitStage(parent) }waitingStages stage // 将当前stage放入到waitingStages中随后提交}}} }在提交Stage的时候会提交这个Stage所有的Task。我们忽略其他细节从下面的代码可以看到Spark会通过方法getPreferredLocs()针对这个Stage的每一个Partition获取这个Partition的Location信息放到一个taskIdToLocations: Map[Int, Seq[TaskLocation]]中然后会提交这些已经带有Location信息的Task -------------------------------------- DAGScheduler --------------------------------------/** Called when stages parents are available and we can now do its task. */private def submitMissingTasks(stage: Stage, jobId: Int) {// First figure out the indexes of partition ids to compute.val partitionsToCompute: Seq[Int] stage.findMissingPartitions().....val taskIdToLocations: Map[Int, Seq[TaskLocation]] try {stage match {case s: ShuffleMapStage // 计算每一个partition(这里的partition就是一个task)的prefered location的listpartitionsToCompute.map { id (id, getPreferredLocs(stage.rdd, id))}.toMapcase s: ResultStage partitionsToCompute.map { id val p s.partitions(id) // ResultStage.partitions中存放的是对应的partition id(id, getPreferredLocs(stage.rdd, p))}.toMap}}// 在获取了这个Stage的每一个task的location preference信息(如果有)以后创建一个新的Stage Attempt// 随后在调度这个Stage的task的时候这个task中就包含了对应的location preferencestage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)// 提交这个Stage。基于listenerBus的事件触发机制调度方法就会将包含了Location Preference的Task调度出去listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties)).......val tasks: Seq[Task[_]] try {val serializedTaskMetrics closureSerializer.serialize(stage.latestInfo.taskMetrics).array()stage match {case stage: ShuffleMapStage stage.pendingPartitions.clear()partitionsToCompute.map { id val locs taskIdToLocations(id)val part partitions(id)stage.pendingPartitions idnew ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),Option(sc.applicationId), sc.applicationAttemptId)}case stage: ResultStage partitionsToCompute.map { id val p: Int stage.partitions(id)val part partitions(p)val locs taskIdToLocations(id)new ResultTask(stage.id, stage.latestInfo.attemptNumber,taskBinary, part, locs, id, properties, serializedTaskMetrics,Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)}}}if (tasks.size 0) {// 提交tasktaskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))} }submitMissingTasks()方法的基本过程为: 获取这个Stage需要进行计算的所有partitions信息 // First figure out the indexes of partition ids to compute.val partitionsToCompute: Seq[Int] stage.findMissingPartitions()这里ResultStage和ShuffledMapStage对于findMissingPartitions()的实现有区别主要是ResultStage中有些函数需要使用的Partition数量不等于这个ResultStage对应的rdd的partition的数量比如我们常见的first()、last()等。所以ResultsStage有一个partitions数组数组中的每一个值代表其实际需要使用的partition id:private[spark] class ResultStage(id: Int,rdd: RDD[_],val func: (TaskContext, Iterator[_]) _,val partitions: Array[Int], // 实际需要的partitionpartitions数组中的每一个值是实际使用的partition在rdd.partitions中的partition idparents: List[Stage],firstJobId: Int,callSite: CallSite)extends Stage(id, rdd, partitions.length, parents, firstJobId, callSite) { 所以对于ResultStage.findMissingPartitions()方法其返回的是ResultStage.partitions的index的集合ResultStage.partitions中在索引index处存放的是对应的Partition ID。比如ResultStage.partitions[0] 2意味着Partition ID 2。而对于ShuffleMapStage返回的直接就是ShuffleMapStage的RDD的partitions数组这个数组中index 和 value是相同的即索引0的位置存放的是partition 0索引1的位置存放的是partition 1.------------------------------------ ResultStage ------------------------------------------override def findMissingPartitions(): Seq[Int] {val job activeJob.get// 如果是ResultStage那么实际参与计算的numPartitions不一定等于stage.rdd.partitions// 因为像first() last()这种function不需要所有的partition参与// 但是对于ShuffleMapStagestage.rdd中所有的partition都需要参与计算(0 until job.numPartitions).filter(id !job.finished(id))}-------------------------------------- ShuffleMapStage -----------------------------------val numPartitions rdd.partitions.length // 对于ShuffleMapStagenumPartitions就是这个Stage对应的RDD的partitions的长度override def findMissingPartitions(): Seq[Int] {mapOutputTrackerMaster.findMissingPartitions(shuffleDep.shuffleId).getOrElse(0 until numPartitions)}---------------------------------- ActiveJob ---------------------------------------/*** 我们需要为这个job计算的partition的数量. 必须注意ResultStage可能并不需要计算目标RDD中所有的partition比如first()或者lookup()等函数*/val numPartitions finalStage match {// ResultStage的partiton数量直接使用ResultStage构造的时候所传入的Partition的数量这个partitions是ResultStage对应的rdd或者是它的子集case r: ResultStage r.partitions.length// 对于ShuffleMapStage它的partitino数量就是对应的ShuffleMapStage.RDD.partitions的长度case m: ShuffleMapStage m.rdd.partitions.length}生成从partitionID 到 对应的Prefered Location的映射关系遍历这个Stage的所有Partition从Partition中提取出对应的Location Preference生成一个从Task 到 Seq[TaskLocation]的映射关系。每一个Task对应了多个TaskLocation每一个 TaskLocation对应了一个位置信息。基于上面讲到的ResultStage的特殊性它和ShuffleMapStage在获取Partition ID的时候不同 如果是ShuffleMapStage这个index就是Partition ID所以直接调用了id, getPreferredLocs(stage.rdd, id)如果是ResultStage由于上面讲到的原因需要先通过val p s.partitions(id)获取这个index对应的Partition ID: val taskIdToLocations: Map[Int, Seq[TaskLocation]] try {stage match {case s: ShuffleMapStage // 计算每一个partition(这里的partition就是一个task)的prefered location的listpartitionsToCompute.map { id (id, getPreferredLocs(stage.rdd, id))}.toMapcase s: ResultStage partitionsToCompute.map { id val p s.partitions(id) // 这里的id其实只是ResultStage.partitions的索引对应的值才是Partition ID(id, getPreferredLocs(stage.rdd, p)) // 根据rdd和partitionsID获取对应的preferered location}.toMap}}创建一个新的Stage Attempt。一次Stage Attempt代表的就是一次Stage的运行如果Stage运行失败需要重试那么就进入下一个 Stage Attempt。这里主要是更新Stage中的_latestInfo代表当前的Stage Attempt的信息:stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)----------------------------------------- Stage ------------------------------------------def makeNewStageAttempt(numPartitionsToCompute: Int,taskLocalityPreferences: Seq[Seq[TaskLocation]] Seq.empty): Unit {val metrics new TaskMetricsmetrics.register(rdd.sparkContext)// taskLocalityPreferences是这一个Seq[Seq[TaskLocation]]代表了每一个task的prefered location// 一个task的prefered location可能不止一个_latestInfo StageInfo.fromStage(this, nextAttemptId, Some(numPartitionsToCompute), metrics, taskLocalityPreferences)nextAttemptId 1}为这个Stage生成对应的Task(只是生成还没有提交): val tasks: Seq[Task[_]] try {val serializedTaskMetrics closureSerializer.serialize(stage.latestInfo.taskMetrics).array()stage match {case stage: ShuffleMapStage stage.pendingPartitions.clear()partitionsToCompute.map { id val locs taskIdToLocations(id)val part partitions(id)stage.pendingPartitions idnew ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),Option(sc.applicationId), sc.applicationAttemptId)}case stage: ResultStage partitionsToCompute.map { id val p: Int stage.partitions(id)val part partitions(p)val locs taskIdToLocations(id)new ResultTask(stage.id, stage.latestInfo.attemptNumber,taskBinary, part, locs, id, properties, serializedTaskMetrics,Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)}}}构造Task的时候对应的Location Preference信息存入了Task对象的locs成员中通过调用preferredLocs可以获取这个Task的Location Preference随后Spark根据Task的preferredLocs进行资源请求并且在申请到了资源以后根据preferredLocs将Task调度到合适的Executor上去:private[spark] class ShuffleMapTask(stageId: Int,stageAttemptId: Int,taskBinary: Broadcast[Array[Byte]],partition: Partition, // Partitiontransient private var locs: Seq[TaskLocation], // Location Preference......)// 获取对应Task的Location Preference用来进行调度决策transient private val preferredLocs: Seq[TaskLocation] {if (locs null) Nil else locs.toSet.toSeq}提交所有的Task if (tasks.size 0) {// 提交tasktaskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))} 下面的代码显示了getPreferredLocsInternal()方法的细节即获取一个RDD中某一个Partition的Location信息(一个Seq[String])返回对应的TaskLocation接口的某个实现类的Seq -------------------------------------- DAGScheduler --------------------------------------private def getPreferredLocsInternal(rdd: RDD[_], // 当前Stage的RDDpartition: Int, // 这个Partitoin的index,即在rdd.partitions中的indexvisited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] {// If the partition has already been visited, no need to re-visit.// This avoids exponential path exploration. SPARK-695if (!visited.add((rdd, partition))) {// Nil has already been returned for previously visited partitions.return Nil}// 如果这个partition已经缓存那么就返回缓存以后的Partion的Location信息val cached getCacheLocs(rdd)(partition)if (cached.nonEmpty) {return cached}// 如果这个RDD含有一个Location Preference那么就返回这个Location Preferenceval rddPrefs rdd.preferredLocations(rdd.partitions(partition)).toListif (rddPrefs.nonEmpty) {return rddPrefs.map(TaskLocation(_)) // 这里会根据getPreferredLocations()的返回值Seq[String]将List中的每一个值映射到一个TaskLocation的实现类即一个Seq[TaskLocation]}TaskLocation的apply()方法会根据传入的参数的类型返回其对应的不同实现类 ---------------------------------- TaskLocation ------------------------------------/*** Create a TaskLocation from a string returned by getPreferredLocations.* These strings have the form executor_[hostname]_[executorid], [hostname], or* hdfs_cache_[hostname], depending on whether the location is cached.*/def apply(str: String): TaskLocation {val hstr str.stripPrefix(inMemoryLocationTag)if (hstr.equals(str)) {if (str.startsWith(executorLocationTag)) {val hostAndExecutorId str.stripPrefix(executorLocationTag)val splits hostAndExecutorId.split(_, 2)val Array(host, executorId) splitsnew ExecutorCacheTaskLocation(host, executorId)} else {new HostTaskLocation(str)}} else {new HDFSCacheTaskLocation(hstr)}} }可以看到rdd.partitions(partition)方法根据这个Partition Index返回了这个Partition对象。对于NewHadoopRDD这个Partition的实现是NewHadoopPartition。一个NewHadoopPartition对象中存放了对应的InputSplit接口的实现类。比如对于一个文件InputSplit的实现是FileSplit而InputSplit则包含了对应的Location信息: 然后调用对应的RDD的getPreferredLocations()方法获取Location Preference即一个NewHadoopRDD中包含了所有的Partition每一个Partition由一个Partition接口的实现类NewHadoopPartition对象表示每个NewHadoopPartition中存放了这个Partition的InputSplit信息 ----------------------------------- NewHadoopRDD ---------------------------------------override def getPreferredLocations(hsplit: Partition): Seq[String] {// 返回这个 NewHadoopPartition对应的Hadoop层面的Split信息比如一个文件的FileSplit// 包含了这个File中这个split的起始位置长度replica的位置val split hsplit.asInstanceOf[NewHadoopPartition].serializableHadoopSplit.value// 提取这个FileSplit的多个Replica位置信息val locs HadoopRDD.convertSplitLocationInfo(split.getLocationInfo)locs.getOrElse(split.getLocations.filter(_ ! localhost))}在Spark端在生成每一个RDD中的Partition的信息的时候其映射关系其实从底向上为 HDFS Split(InputSplit) - Partition(NewHadoopPartition) - Task 的关系因此在生成RDD Partition的时候会将对应的Split的Location信息转换成Partition的Location信息这个Partition的Location信息使用TaskLocation对象的具体实现类来表达的。这个转换是由object方法convertSplitLocationInfo()负责的该方法的输入是一个Array[SplitLocationInfo]输出为对应的Seq[String]即对应的Location的字符串表示即: 如果Split的是内存缓存则构造HDFSCacheTaskLocation的location对象代表这个partition已经被HDFS的对应Host缓存了。很显然对应的Task如果直接调度到这台机器则会提高读取效率;如果Split不是内存缓存则构造HostTaskLocation显然Task调度到这条机器上读取效率会更高(没有跨机器的网络带宽如果打开了短路读还可以利用HDFS的短路读特性)---------------------------------- HadoopRDD ---------------------------------------private[spark] def convertSplitLocationInfo(infos: Array[SplitLocationInfo]): Option[Seq[String]] {Option(infos).map(_.flatMap { loc val locationStr loc.getLocationif (loc.isInMemory) {Some(HDFSCacheTaskLocation(locationStr).toString)} else {Some(HostTaskLocation(locationStr).toString)}})}TaskLocation的实现类除了HDFSCacheTaskLocation和HostTaskLocation还有ExecutorCacheTaskLocation即这个Task希望运行在这个Executor上比如下面两种情况这个Partition的Locality Preference是希望精确到对应的Executor的: 在Streaming的Task的调度中对于Receiver的调度为了均匀调度会首先将所有的Task均匀调度到Host上剩下的Task均匀调度到Executor上。这不不在赘述详细逻辑可以查看ReceiverSchedulingPolicy的scheduleReceivers()方法;这个Partition在这个Executor上被缓存因此对应的Task肯定需要精确运行在对应的Executor上。这里不在赘述对应的方法为DAGScheduler.getCacheLocs() private[scheduler]def getCacheLocs(rdd: RDD[_]): IndexedSeq[Seq[TaskLocation]] cacheLocs.synchronized {// Note: this doesnt use getOrElse() because this method is called O(num tasks) timesif (!cacheLocs.contains(rdd.id)) {// Note: if the storage level is NONE, we dont need to get locations from block manager.val locs: IndexedSeq[Seq[TaskLocation]] if (rdd.getStorageLevel StorageLevel.NONE) {IndexedSeq.fill(rdd.partitions.length)(Nil) // 如果对应RDD的StorageLevel是NONE那么返回一个空个的TaskLocation数组} else { // 如果RDD的StorageLevel不是空的那么会构造对应的ExecutorCacheTaskLocationval blockIds rdd.partitions.indices.map(index RDDBlockId(rdd.id, index)).toArray[BlockId]blockManagerMaster.getLocations(blockIds).map { bms // 通过BlockManagerMaster获取对应的Block的位置然后构造对应的ExecutorCacheTaskLocationbms.map(bm TaskLocation(bm.host, bm.executorId))}}cacheLocs(rdd.id) locs}cacheLocs(rdd.id)}对应的TaskLocation接口的实现类的toString()方法就返回了这个TaskLocation的String表示。比如HostTaskLocation.toString()就返回对应的Hostname: ----------------------------------------- HostTaskLocation ---------------------------------- /*** A location on a host.*/ private [spark] case class HostTaskLocation(override val host: String) extends TaskLocation {override def toString: String host }-------------------------------------- HDFSCacheTaskLocation ---------------------------------- /*** A location on a host that is cached by HDFS.*/ private [spark] case class HDFSCacheTaskLocation(override val host: String) extends TaskLocation {override def toString: String TaskLocation.inMemoryLocationTag host } 这样DAGScheduler.submitMissingTasks()就会提交这些带有Location Preference的Task。这些Task会在Driver端经过一些统计计算让ApplicationMaster根据提供的资源量、Task的位置倾向等向Yarn申请资源申请到的资源向Driver注册Driver再将Task和Container进行最有匹配最大程度满足Task的位置需求。 根据TaskLocation信息将Task添加到不同的pendingTask数组中 所有需要运行因此需要相应资源的TaskDriver都会将对应的Task添加到pendingTask中这些pendingTask是由TaskSetManager维护的一个TaskSetManager对象是一个Stage的任务集合主要负责这个Stage的Task的管理和调度。 根据每一个Task的资源本地性需求的不同pendingTask分别维护在下面的Map中代码如下: // 希望在某一个Executor上运行的pendingTasksKey是Executor IDValue是希望在这个Executor上运行的TaskID的列表private val pendingTasksForExecutor new HashMap[String, ArrayBuffer[Int]]// 希望在某一个Host上运行的pendingTasks,Key是对应的HostValue是希望在这个Host上运行的TaskID的列表private val pendingTasksForHost new HashMap[String, ArrayBuffer[Int]]// 希望在某个一Rack上运行的pendingTasksKey是对应的RackValue是希望在这个Rack上运行的TaskID的列表private val pendingTasksForRack new HashMap[String, ArrayBuffer[Int]]// 没有任何的本地性需求的pendingTasks的列表private[scheduler] var pendingTasksWithNoPrefs new ArrayBuffer[Int]向PendingTask中添加任务的时候会根据这个task的Locality Preference将对应的pendingTask放入到上面不同的任务集合中 /** 将一个Task添加到它所属的所有的pending-task lists中去 */private[spark] def addPendingTask(index: Int) {for (loc - tasks(index).preferredLocations) {loc match {case e: ExecutorCacheTaskLocation // Executor级别的Location PreferencependingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) indexcase e: HDFSCacheTaskLocation // HDFS缓存 级别的Location Preferenceval exe sched.getExecutorsAliveOnHost(loc.host) // 获取这个location所在的host上的所有executor idexe match {case Some(set) for (e - set) { // 对于这个host上的每一个executor// 将这个task添加到这个host上的每一个executor上去pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer) index}}case _ }// 将这个task添加到pendingTasksForHostpendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer) indexfor (rack - sched.getRackForHost(loc.host)) {pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer) index}} // end of for loopif (tasks(index).preferredLocations Nil) {pendingTasksWithNoPrefs index}allPendingTasks index // No point scanning this whole list to find the old task there}其基本逻辑为: 遍历所有的Task根据对应Task的Location Preference将对应的Task添加到对应的pendingLocation数组中: for (loc - tasks(index).preferredLocations) { ......}如果是ExecutorCacheTaskLocation那么就将Task添加到pendingTasksForExecutor中 case e: ExecutorCacheTaskLocation pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) index如果是HDFSCacheTaskLocation并且这个Host上的确有active的Executor那么就将这个task添加到这个Host上的每一个Executor上即将这个Task添加到pendingTasksForExecutor中。但是显然这个Task最终只会在这个Host中的某一个Executor上运行一次而不会重复运行 val exe sched.getExecutorsAliveOnHost(loc.host) // 获取这个Host上所有的alive executorsexe match {case Some(set) for (e - set) { // 对于这个host上的每一个executor// 将这个task添加到这个host上的每一个executor上去pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer) index}无论是ExecutorCacheTaskLocation还是HDFSCacheTaskLocation都将这个Task添加到pendingTasksForHost中因为这个Task肯定会运行着这个Host上的。 pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer) index获取这个Host所在的Rack然后添加到这个对应的pendingTasksForRack中 for (rack - sched.getRackForHost(loc.host)) {pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer) index}如果这个Task没有任何location preference那么就添加到pendingTasksWithNoPrefs中 if (tasks(index).preferredLocations Nil) {pendingTasksWithNoPrefs index}无论怎样将这个task添加到allPendingTasks中这是当所有的locality都无法满足要求一户最坏的Locality选择。 总之我们需要注意: 区分pendingTasksWithNoPrefs和allPendingTasks前者意味着Task本身对locality没有任何要求因此可以随意将Task进行分配。而allPendingTasks中存放了所有的task意味着当我们无法满足task的localtiy要求以后的不得已的选择。比如一个在pendingTasksForRack中的task的locality要求无法满足只能退化到选择其他Rack来运行该TaskpendingTasksForExecutor、pendingTasksForHost、pendingTasksForRack之间存在的包含关系即如果一个Task是pendingTasksForExecutor那么也会放入到这个Executor所在的host(pendingTasksForHost)和所在的rack(pendingTasksForRack)中同理如果一个Task的pendingTasksForHost也会放入到这个Host所在的Rack(pendingTasksForRack)中。而且所有的Task都会放入到allPendingTasks中。我们从后面的locality退化可以看到原因即如果某个locality要求下无法分配完所有的task那么会退化到低一级的优先级而低一级的优先级列表中肯定包含了高一级优先级列表的Task因此这个Task可以在低一级进行调度。我们可以看到pendingTasksForRack中的Task并不是这个task的Locality Preference直接就是就是这个rack而是这个task的location preference是对应的这个Rack中的某一个Host因此将这个Host所在的Rack添加为这个Task的Rack Locality。后面会看到Rack locality是比no preference更低一级的locality其实就是这个原因尽管表面上看起来比较疑惑也与比如Yarn的任务调度逻辑似乎不同。 可用的LocationLevel的计算 Spark用一个枚举类型TaskLocality来表达不同的本地级别 object TaskLocality extends Enumeration {// Process local is expected to be used ONLY within TaskSetManager for now.val PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY ValuePROCESS_LOCAL: 进程本地即这个Task希望调度到某一个特定的Executor上 NODE_LOCAL 节点本地即这个Task希望调度到某一个特定的节点 NO_PREF: 没有本地倾向即这个Task可以任意调度到集群中的任何节点 RACK_LOCAL 机架本地即这个Task希望调度到某一个特定的rack上 ANY: 无法满足要求是前面四种本地性倾向都无法满足要求时的最后的选择策略将有节点倾向的Task任意调度到集群中无法满足其本地性倾向的位置上。 这里需要注意的是: 显然越靠前数字越小Locality级别越高RACK_LOCAL并不指的是这些任务的locality preference中本来就指明了运行在这个Rack上而是NODE_LOCAL或者PROCESS_LOCAL的对应退化调度策略即如果对应的节点或者Executor由于资源问题无法满足其要求就会退化到使用RACK_LOCAL并不是Task本身声明调度到Rack上。NO_PREF的优先级比RACK_LOCAL的优先级更高这是因为NO_PREF指的是Task本身明确表明自己没有本地性偏好因此不是一种退化策略而是对其Locality Preference的一种完全的满足而RACK_LOCAL其实是一种退化策略因此RACK_LOCAL的优先级低于NO_PREFANY的优先级最低和RACK_LOCAL一样也是一种退化策略。 在一个TaskSet的TaskSetManager构造的时候以及后来一个新的Executor加入或者丢失的时候都会重新计算这个TaskSet的Valid Locality Levels。所以应该注意到 Valid Locality Levels 指的是根据当前的Pending Task的不同LOCALITY需求(比如不同locality需求的pending task分别放在了pendingTasksForExecutor、pendingTasksForHost、pendingTasksWithNoPrefs、pendingTasksForRack和allTasks中)计算我们目前可能需要的是哪些TaskLocality。这样在对Task进行基于Locality 的分配(下文会讲到)的时候只需要考虑这些Valid Locality Levels。必须注意computeValidLocalityLevels()是TaskSetManager的成员方法即这里是为某一个TaskSet计算对应的Valid Locality Levels 下面的方法显示了计算Valid Locality Levels的基本过程 ------------------------------------- TaskSetManager --------------------------------------private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] {import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY}val levels new ArrayBuffer[TaskLocality.TaskLocality]// 如果有任务在等待执行器executor并且这些执行器中有活着的执行器那么就将PROCESS_LOCAL添加到有效级别列表中。if (!pendingTasksForExecutor.isEmpty pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) {levels PROCESS_LOCAL}// 如果有任务在等待主机host并且这些主机上有活着的执行器那么就将NODE_LOCAL添加到有效级别列表中。if (!pendingTasksForHost.isEmpty pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) {levels NODE_LOCAL}if (!pendingTasksWithNoPrefs.isEmpty) {levels NO_PREF}// 如果有任务在等待机架rack并且这些机架上有活着的主机那么就将RACK_LOCAL添加到有效级别列表中。if (!pendingTasksForRack.isEmpty pendingTasksForRack.keySet.exists(sched.hasHostAliveOnRack(_))) {levels RACK_LOCAL}levels ANYlevels.toArray}该方法返回一个TaskLocality数组即如果对应的pendingTask数组中有Task那么就将对应的TaskLocality添加到数组中。这里需要注意往数组中添加TaskLocality是按照TaskLocality的值从低到高(Locality优先级从高到低)的顺序添加的从下文介绍基于Locality的Task调度可以看到调度时会遍历这个返回的TaskLocality数组即调度时按照Locality优先级从高到低进行的。最后的TaskLocality.ANY一定会最后添加到结果中作为一个优先级的保底操作。 基于Locality的Task调度 上文讲过Task的调度的触发以及其通过makeOffers()方法进行调度。在这里我们详细讲解这个调度的基本细节。 makeOffers()的基本功能就是为当前的Executor的资源剩余情况生成对应的WorkerOffer代表这些Executor可提供的运行资源其中包含了对应的executorId所在的host信息以及可用的vCore信息 private[spark] case class WorkerOffer(executorId: String, host: String, cores: Int)然后TaskScheduler会根据这些剩余资源将对应的pendingTask调度出去当然调度过程中需要依赖对应的Task的Locality信息。 -------------------------------------- CoarseGrainedSchedulerBackend -------------------------------private def makeOffers() {// 根据集群当前的可用资源状况生成Task的调度结果val taskDescs CoarseGrainedSchedulerBackend.this.synchronized {// 筛选出alive的executorval activeExecutors executorDataMap.filterKeys(executorIsAlive)val workOffers activeExecutors.map {case (id, executorData) new WorkerOffer(id, executorData.executorHost, executorData.freeCores)}.toIndexedSeq // 构造每一个Executor上的可用资源scheduler.resourceOffers(workOffers) // 在这里依赖于TaskScheduler来调度对应的task到对应的worker上}launchTasks(taskDescs) // 启动这些tasks}resourceOffers()方法是TaskSchedulerImpl的成员方法其输入是一系列的WorkerOffer返回可以进行调度的所有Task(每一个Task由一个TaskDescription表示)。我们后面会看到TaskSchedulerImpl会遍历当前需要调度的所有TaskSet尝试进行调度而不是某一个TaskSet。 resourceOffers()代码如下所示: ---------------------------------------- TaskSchedulerImpl ------------------------------def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] synchronized {// Mark each slave as alive and remember its hostname// Also track if new executor is addedvar newExecAvail falsefor (o - offers) { // 对于每一个WorkerOfferif (!hostToExecutors.contains(o.host)) {hostToExecutors(o.host) new HashSet[String]() // 构建这个Host的Executor的map}// 是否有新增的executor进来。如果有则维护executor相关的map信息if (!executorIdToRunningTaskIds.contains(o.executorId)) {hostToExecutors(o.host) o.executorId // 将这个executor加到这个host - executors 的map中executorAdded(o.executorId, o.host)executorIdToHost(o.executorId) o.host // 将这个host加到executor - host 的毛重executorIdToRunningTaskIds(o.executorId) HashSet[Long]() // 构建这个executor - tasks的mapnewExecAvail true}for (rack - getRackForHost(o.host)) {hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) o.host // 构建rack - hosts的map}}// Before making any offers, remove any nodes from the blacklist whose blacklist has expired. Do// this here to avoid a separate thread and added synchronization overhead, and also because// updating the blacklist is only relevant when task offers are being made.blacklistTrackerOpt.foreach(_.applyBlacklistTimeout())val filteredOffers blacklistTrackerOpt.map { blacklistTracker offers.filter { offer !blacklistTracker.isNodeBlacklisted(offer.host) !blacklistTracker.isExecutorBlacklisted(offer.executorId)}}.getOrElse(offers)// 将offer进行随机shuffle返回打乱顺序以后的IndexedSeq[WorkerOffer]val shuffledOffers shuffleOffers(filteredOffers)// Build a list of tasks to assign to each worker.// 根据当前的WorkerOffer预构建一个TaskDescription的二维数组val tasks shuffledOffers.map(o new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))val availableCpus shuffledOffers.map(o o.cores).toArray // 构建WorkerOffer - 可用CPU的对应关系val sortedTaskSets rootPool.getSortedTaskSetQueuefor (taskSet - sortedTaskSets) {if (newExecAvail) { // 如果有新的Executor加入进来taskSet.executorAdded() // 重新计算这个TaskSet的locality的相关信息}}// Take each TaskSet in our scheduling order, and then offer it each node in increasing order// of locality levels so that it gets a chance to launch local tasks on all of them.// NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANYfor (taskSet - sortedTaskSets) { // 按照调度顺序取出每一个TaskSetvar launchedAnyTask falsevar launchedTaskAtCurrentMaxLocality false// locality从低到高遍历这个TaskSet中的每一个可用的localityLevelslocality越低代表本地性越好for (currentMaxLocality - taskSet.myLocalityLevels) {do {launchedTaskAtCurrentMaxLocality resourceOfferSingleTaskSet(taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)launchedAnyTask | launchedTaskAtCurrentMaxLocality} while (launchedTaskAtCurrentMaxLocality)}if (!launchedAnyTask) { // 没有launch任何一个tasktaskSet.abortIfCompletelyBlacklisted(hostToExecutors)}}if (tasks.size 0) {hasLaunchedTask true}return tasks}resourceOffers()的基本流程如下图所示 其基本流程为: 根据生成的每一个WorkerOfferTaskScheduler会维护executor - host - rack之间的映射关系。刚刚说过resourceOffers()调用有可能来自于Executor的注册因此有必要根据WorkerOffer来更新这个新的Executor和Host以及Rack之间的映射关系 for (o - offers) { // 对于每一个WorkerOfferif (!hostToExecutors.contains(o.host)) {hostToExecutors(o.host) new HashSet[String]() // 构建这个Host的Executor的map}// 是否有新增的executor进来。如果有则维护executor相关的map信息if (!executorIdToRunningTaskIds.contains(o.executorId)) {hostToExecutors(o.host) o.executorId // 将这个executor加到这个host - executors 的map中executorAdded(o.executorId, o.host)executorIdToHost(o.executorId) o.host // 将这个host加到executor - host 的毛重executorIdToRunningTaskIds(o.executorId) HashSet[Long]() // 构建这个executor - tasks的mapnewExecAvail true}for (rack - getRackForHost(o.host)) {hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) o.host // 构建rack - hosts的map}}过滤掉部分不考虑的WorkerOffer比如这个WorkerOffer的Node或者Executor在黑名单中 val filteredOffers blacklistTrackerOpt.map { blacklistTracker offers.filter { offer !blacklistTracker.isNodeBlacklisted(offer.host) !blacklistTracker.isExecutorBlacklisted(offer.executorId)}}.getOrElse(offers)为了避免每次都将Task调度到某一个WorkerOffer上(假如这个WorkerOffer资源足够多)每次调用resourceOffers()的时候都会对WorkerOffer进行一次重新随机排序。注意这个重新排序只是将WorkerOffer重新排序没有将Locality重新排序Locality永远从高到低(值从小到大)进行考虑: // 将offer进行随机shuffle返回打乱顺序以后的IndexedSeq[WorkerOffer]val shuffledOffers shuffleOffers(filteredOffers)// Build a list of tasks to assign to each worker.// 根据当前的WorkerOffer预构建一个TaskDescription的二维数组val tasks shuffledOffers.map(o new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))val availableCpus shuffledOffers.map(o o.cores).toArray // 构建WorkerOffer - 可用CPU的对应关系根据TaskSet的调度策略获取排序以后的TaskSet列表。对于列表中的每一个TaskSet如果这个TaskSet刚刚有新的Host加入需要通过调用TaskSet的executorAdded()方法这个方法主要是重新计算locality信息。上文已经介绍过TaskSet的Locality信息的计算。 TaskSet的调度策略不在本文介绍范围内感兴趣的读者可以自行了解val sortedTaskSets rootPool.getSortedTaskSetQueue // 根据TaskSet调度策略返回排序以后的TaskSet数组 for (taskSet - sortedTaskSets) {if (newExecAvail) { // 如果有新的Executor加入进来taskSet.executorAdded() // 重新计算这个TaskSet的locality的相关信息} }根据排序以后的TaskSet遍历每一个TaskSet基于这个TaskSet的Locality数组对这个TaskSet的Tasks进行调度 for (taskSet - sortedTaskSets) { // 按照调度顺序取出每一个TaskSetvar launchedAnyTask falsevar launchedTaskAtCurrentMaxLocality false// locality从低到高遍历这个TaskSet中的每一个可用的localityLevelslocality越低代表本地性越好for (currentMaxLocality - taskSet.myLocalityLevels) {do {launchedTaskAtCurrentMaxLocality resourceOfferSingleTaskSet(taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)launchedAnyTask | launchedTaskAtCurrentMaxLocality} while (launchedTaskAtCurrentMaxLocality)}if (!launchedAnyTask) { // 没有launch任何一个tasktaskSet.abortIfCompletelyBlacklisted(hostToExecutors)}}上文讲过locality数组的生成过程可以看到locality数组从前到后的Locality从高到低因此这里的逻辑是对于每一个TaskSet尝试先调度Locality最高的Task并且如果当前要求的Locality在指定的超时时间内无法将所Task调度完毕将尝试更低一级的locality进行调度。到最后locality会降低到TaskLocality.ANY即进行任意调度。 所以对于一个TaskSet中的task根据当前要求的TaskLocality进行任务调度发生在方法resourceOfferSingleTaskSet()中。这个方法根据允许的最大的locality(currentMaxLocality 这里的最大指的是最低要求即不可以比这个locality更宽松了)当前可用资源shuffledOffers需要调度的TaskSet返回成功调度的这个TaskSet中的task private def resourceOfferSingleTaskSet(taskSet: TaskSetManager, // 当前的TaskSetmaxLocality: TaskLocality, // 当前最大的locality最大的意思是最优的localityshuffledOffers: Seq[WorkerOffer], // 每一个WorkerOffer代表了一个可用资源availableCpus: Array[Int], // 这个shuffledOffers中的每一个WorkerOffer所代表的可用的VCores// 一个WorkerOffer按照可用cpu以及每个task的cpu算出Task的数量tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean {var launchedTask false// nodes and executors that are blacklisted for the entire application have already been// filtered out by this pointfor (i - 0 until shuffledOffers.size) { // 遍历每一个WorkerOffer(每一个WorkerOffer对应了一个executor)val execId shuffledOffers(i).executorIdval host shuffledOffers(i).hostfor (task - taskSet.resourceOffer(execId, host, maxLocality)) { // 往这个Offer上调度一个Tasktasks(i) task // 保存调度结果val tid task.taskId....launchedTask true}}return launchedTask}从上面的代码可以看到resourceOfferSingleTaskSet()方法会遍历当前所有的可用资源WorkerOffer尝试按照当前的maxLocality调用resourceOffer()方法往这个WorkerOffer上面调度一个task。 resourceOffer()方法是TaskSet的成员方法其根据executorID hostname以及最大允许的locality(maxLocality 即locality不可以再差了)尝试从pendingTask中选出一个满足条件的task调度 def resourceOffer(execId: String, // executor idhost: String, // executor所在的hostmaxLocality: TaskLocality.TaskLocality) // 所容许的locality不能比这个locality更宽松: Option[TaskDescription] { ....val curTime clock.getTimeMillis()// allowedLocality 代表当前最宽松的locality是什么显然在开始的时候我们希望allowedLocality严格一点儿// 后面如果分配失败了再逐渐放松要求var allowedLocality maxLocality// 如果 maxLocality TaskLocality.NO_PREF那么allowedLocality maxLocality// 进入TaskLocality.NO_PREF本来就是对调度没有任何要求if (maxLocality ! TaskLocality.NO_PREF) {allowedLocality getAllowedLocalityLevel(curTime) // 根据当前的时间更新当前时间节点下的allowedLocality这个allowedLocality可能小于maxLocality,这时候就会使用这个更小(更严格)的localityif (allowedLocality maxLocality) { // 如果allowedLocality比maxLocality更松弛那么还是以maxLocality为准// Were not allowed to search for farther-away tasksallowedLocality maxLocality}}// 根据当前计算得到的locality 弹出对应的tasks然后调度起来dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality, speculative)) // Found a task; do some bookkeeping and return a task description.....currentLocalityIndex getLocalityIndex(taskLocality)lastLaunchTime curTime // 更新这个TaskSetManager的最后一个task的启动时间.........addRunningTask(taskId)sched.dagScheduler.taskStarted(task, info)new TaskDescription(.....)}}这个方法的基本逻辑为: 准备计算allowedLocality初始值为maxLocalityvar allowedLocality maxLocality尝试更新allowedLocality 为什么allowedLocality有可能会跟maxLocality不同原因是maxLocality代表了这个TaskSet的locality数组的外层循环(resourceOffers()方法会按照locality从第到高进行Task的分配)但是在延迟调度的场景下在某一个locality level(比如NODE_LOCAL)没有合适资源的情形下在配置的退化时间到来之前不急于将locality进行退化处理即myLocalityLevels(currentLocalityIndex)不变依然为NODE_LOCAL。在下一轮调度重新到来的时候(resourceOffers()方法重新运行)在尝试到maxLocality ALL的时候突然有了可以满足locality NODE_LOCAL的资源这时候getAllowedLocalityLevel()就会返回myLocalityLevels(currentLocalityIndex) NODE_LOCAL因为这个locality level的超时时间还没到进而maxLocality更新为NODE_LOCAL从而在最大运行的locality为NODE_LOCAL(而不是resourceOffers()循环到的位置ALL)当前的约束下进行任务调度 // 如果 maxLocality TaskLocality.NO_PREF那么allowedLocality maxLocality// 进入TaskLocality.NO_PREF本来就是对调度没有任何要求if (maxLocality ! TaskLocality.NO_PREF) {allowedLocality getAllowedLocalityLevel(curTime) // 根据当前的时间更新当前时间节点下的allowedLocality这个allowedLocality可能小于maxLocality,这时候就会使用这个更小(更严格)的localityif (allowedLocality maxLocality) { // 如果allowedLocality比maxLocality更松弛那么还是以maxLocality为准// Were not allowed to search for farther-away tasksallowedLocality maxLocality}}根据当前计算得到的allowedLocality尝试从这个WorkOffer对应的execID,host中弹出一个task并返回结果// 根据当前计算得到的locality 弹出对应的tasks然后调度起来dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality, speculative)) // Found a task; do some bookkeeping and return a task description.....currentLocalityIndex getLocalityIndex(taskLocality)lastLaunchTime curTime // 更新这个TaskSetManager的最后一个task的启动时间.........addRunningTask(taskId)sched.dagScheduler.taskStarted(task, info)new TaskDescription(.....)}可以看到TaskSetManager.getAllowedLocalityLevel()方法根据当前时间计算当前真正的允许的locality其核心原理为每一个TaskSet都维护了currentLocalityIndexmyLocalityLevels[currentLocalityIndex]就对应了这个TaskSet当前应该使用的Locality级别。只有在某种条件下currentLocalityIndex索引会通过自增的方式向前移动以移动到下一个Locality(即Locality变得更松弛)即getAllowedLocalityLevel()方法控制了currentLocalityIndex移动条件保证currentLocalityIndex只有在某些条件下才能移动到下一级。移动到下一级的条件为 当前currentLocalityIndex对应的locality的pendingTasks下面已经没有task自然应该移动到下一个Locality这个很容易理解即当前的locality的所有task已经处理完了。虽然当前currentLocalityIndex对应的locality的pendingTasks下面有task没处理完但是基于该locality已经有很长时间无法launch一个task了(即当前时间已经超过了这个TaskSet的上一次成功launch一个task的时间)没有必要继续再尝试这个Locality了只好移动到下一个Locality(Locality 降级)。 具体代码如下所示: ------------------------------------ TaskSetManager --------------------------------------------private def getAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality {// Remove the scheduled or finished tasks lazilydef tasksNeedToBeScheduledFrom(pendingTaskIds: ArrayBuffer[Int]): Boolean {....}def moreTasksToRunIn(pendingTasks: HashMap[String, ArrayBuffer[Int]]): Boolean {......}// 循环遍历所有位置级别直到达到最后一级currentLocalityIndex从小到大本地性由强到弱while (currentLocalityIndex myLocalityLevels.length - 1) {// 查看当前的locality下面是否有task还没有运行val moreTasks myLocalityLevels(currentLocalityIndex) match {case TaskLocality.PROCESS_LOCAL moreTasksToRunIn(pendingTasksForExecutor)case TaskLocality.NODE_LOCAL moreTasksToRunIn(pendingTasksForHost)case TaskLocality.NO_PREF pendingTasksWithNoPrefs.nonEmptycase TaskLocality.RACK_LOCAL moreTasksToRunIn(pendingTasksForRack)}if (!moreTasks) {// 如果当前locality级别没有更多的任务将lastLaunchTime更新为当前时间并将位置级别索引增加1。lastLaunchTime curTimecurrentLocalityIndex 1 // 查看下一级locality} else if (curTime - lastLaunchTime localityWaits(currentLocalityIndex)) {// 尽管这个localityWaits(currentLocalityIndex) 的locality上还有对应的pending task// 但是达到等待时间的情况会进入下一个localitylastLaunchTime localityWaits(currentLocalityIndex)currentLocalityIndex 1 // 查看下一级locality} else {// 在myLocalityLevels(currentLocalityIndex)的locality下还有更多的task并且没有超时那么就返回这个localityreturn myLocalityLevels(currentLocalityIndex)}}myLocalityLevels(currentLocalityIndex)}其中TaskSetManager中的lastLaunchTime维护了这个TaskSet中上一次启动一个Task的时间即只要我们在满足allowedLocalityLevel的条件下在某一个WorkOffer上启动了一个Task那么就将这个TaskSet的lastLaunchTime更新为当前时间。这样如果一个TaskSet上长时间无法launch一个Task那么这个allowedLocalityLevel就会自增即降低Locality Preference的要求。 通过TaskSetManager中getLocalityWait()方法可以看到我们可以为每一个TaskLocality配置不同的超时时间 ------------------------------------ TaskSetManager -------------------------------------------private def getLocalityWait(level: TaskLocality.TaskLocality): Long {val defaultWait conf.get(config.LOCALITY_WAIT)val localityWaitKey level match {case TaskLocality.PROCESS_LOCAL spark.locality.wait.processcase TaskLocality.NODE_LOCAL spark.locality.wait.nodecase TaskLocality.RACK_LOCAL spark.locality.wait.rackcase _ null}if (localityWaitKey ! null) {conf.getTimeAsMs(localityWaitKey, defaultWait.toString)} else {0L}}在获取了最后的allowedLocalityLevel以后就尝试通过dequeueTask(execId, host, allowedLocality)方法根据当前WorkOffer的信息(execID, host)以及allowedLocalityLevel从对应的pendingTask中取出符合要求的task这里的符合要求指的是这个Task的locality偏好是允许在这个hostexecutorID上运行的并且如果在这个host executorID上运行其locality是不会比maxLocality更差的。即 dequeueTask()的逻辑如下所示。它根据当前的WorkerOffer(Executor ID hostname)和允许的最大的locality(maxLocality)返回一个locality满足要求的Task准备运行。这里的满足要求指的是这个Task如果运行在这个WorkerOffer上能够满足所允许的最差的locality(maxLocality)的要求。 ------------------------------------ TaskSetManager --------------------------------------------private def dequeueTask(execId: String, host: String, maxLocality: TaskLocality.Value): Option[(Int, TaskLocality.Value, Boolean)] {// TaskLocality.isAllowed(maxLocality, TaskLocality.PROCESS_LOCAL) 永远返回true因此不做判断for (index - dequeueTaskFromList(execId, host, getPendingTasksForExecutor(execId))) {return Some((index, TaskLocality.PROCESS_LOCAL, false))}// 假如maxLocality允许NODE_LOCAL那么就从getPendingTasksForHost中取出对应的Host的task返回if (TaskLocality.isAllowed(maxLocality, TaskLocality.NODE_LOCAL)) {for (index - dequeueTaskFromList(execId, host, getPendingTasksForHost(host))) {return Some((index, TaskLocality.NODE_LOCAL, false))}}// 假如maxLocality允许NO_PREF那么就从pendingTasksWithNoPrefs中取出对应的task返回if (TaskLocality.isAllowed(maxLocality, TaskLocality.NO_PREF)) {// Look for noPref tasks after NODE_LOCAL for minimize cross-rack trafficfor (index - dequeueTaskFromList(execId, host, pendingTasksWithNoPrefs)) {return Some((index, TaskLocality.PROCESS_LOCAL, false))}}// 假如maxLocality允许RACK_LOCAL那么就获取当前的host的rack从对应的rack的pendingTasks中取出对应的task返回if (TaskLocality.isAllowed(maxLocality, TaskLocality.RACK_LOCAL)) {for {rack - sched.getRackForHost(host)index - dequeueTaskFromList(execId, host, getPendingTasksForRack(rack))} {return Some((index, TaskLocality.RACK_LOCAL, false))}}if (TaskLocality.isAllowed(maxLocality, TaskLocality.ANY)) {for (index - dequeueTaskFromList(execId, host, allPendingTasks)) {return Some((index, TaskLocality.ANY, false))}}// find a speculative task if all others tasks have been scheduleddequeueSpeculativeTask(execId, host, maxLocality).map {case (taskIndex, allowedLocality) (taskIndex, allowedLocality, true)}}示例1 当前的WorkerOffer信息如下: Hostname: testhost1.corp.comExecutorID: 25536 当前的maxLocality NODE_LOCAL testhost1-rack1.corp.com所在的rack为rack1 在当前的TaskSet中pendingTasksForExecutor中有一个Task A其preference正好是 Executor 25535。因此根据上文所讲解的pendingTask的添加过程Task A会同时存在于pendingTasksForExecutorpendingTasksForHostpendingTasksForRack以及allTasks中。 这时候dequeueTask()的运行过程为: 由于pendingTasksForExecutor中没有针对Executor 25536的Task A因此返回Task ATask A的实际locality是更好的PROCESS_LOCAL满足了maxLocality NODE_LOCAL的要求 示例2 当前的WorkerOffer信息如下: Hostname: testhost1.corp.comExecutorID: 25536 当前的maxLocality NODE_LOCAL testhost1-rack1.corp.com所在的rack为rack1 在当前的TaskSet中pendingTasksForExecutor中没有针对ExecutorID: 25536 的pending Task。但是在pendingTasksForHost中有一个Task B 其 locality preference正好是Hostname: testhost1.corp.com。 根据上文所讲解的pendingTask的添加过程Task B会同时存在于pendingTasksForHostpendingTasksForRack以及allTasks中。 这时候dequeueTask()的运行过程为: 由于pendingTasksForExecutor中没有针对Hostname: testhost1.corp.com因此跳过由于pendingTasksForHost中有针对Hostname testhost1.corp.com的Task因此取出并返回Task B此时的实际locality正好等于maxLocality为 NODE_LOCAL 示例3 当前的WorkerOffer信息如下: Hostname: testhost1.corp.comExecutorID: 25536 当前的maxLocality RACK_LOCAL testhost1-rack1.corp.com所在的rack为rack1 在当前的TaskSet中pendingTasksForExecutor中没有针对ExecutorID: 25536 的pending Task。但是在pendingTasksForHost中有一个Task B 其 locality preference是Hostname: testhost2-rack1.corp.com即它的locality preference同当前的WorkerOffer不在一个节点但是在一个机架上。 根据上文所讲解的pendingTask的添加过程Task B会同时存在于pendingTasksForHostpendingTasksForRack以及allTasks中。 这时候dequeueTask()的运行过程为: 由于pendingTasksForExecutor中没有针对Hostname: testhost1.corp.com因此跳过由于pendingTasksForHost中有针对Hostname testhost1.corp.com的Task因此跳过由于pendingTasksForHost中有针对rack1的Task B因此取出并返回Task B此时的实际locality是RACK_LOCAL 示例4 当前的WorkerOffer信息如下: Hostname: testhost1.corp.comExecutorID: 25536 当前的maxLocality ANY testhost1-rack1.corp.com所在的rack为rack1 在当前的TaskSet中pendingTasksForExecutor中没有针对ExecutorID: 25536的的pending Task。pendingTasksWithNoPrefs也没有Task。但是在pendingTasksForHost中有一个Task D 其 locality preference是另外一个Hostname: testhost2-rack2.corp.com(这个testhost2所在的rack为rack2)。 根据上文所讲解的pendingTask的添加过程Task D会同时存在于pendingTasksForHostpendingTasksForRack以及allTasks中。 这时候dequeueTask()的运行过程为: 由于pendingTasksForExecutor中没有针对ExecutorID: 25536因此跳过由于pendingTasksForHost中没有针对Hostname testhost2-rack2.corp.com的Task因此跳过由于pendingTasksWithNoPrefs中没有任何Task因此跳过由于getPendingTasksForRack中没有针对rack1的任何task因此跳过在allPendingTasks中取出Task D满足maxLocality NODE_LOCAL 要求因此取出并返回Task D此时实际的locality正好等于maxLocality为ANY。 结语 Spark在Yarn上的资源管理是粗粒度的资源管理(Coarse Grained)即资源和Task并不一一对应。ApplicationMaster作为资源请求的代理充当了细粒度的Task和粗粒度的Yarn Container之间的桥梁即根据细粒度的、全量的Task资源需求状态不断生成增量的、粗粒度的资源请求并将Yarn不断异步返回的资源和当前的Task进行匹配以最优化Task的放置。 本文全面地讲解了整个资源请求和调度过程。读者从中可以看到基于Yarn的整个资源调度过程Spark的不同角色之间的通信过程整个过程虽然是以Yarn为基础但是其反映的是一个通用的资源调度决策的基本思路因此很有参考意义。 我觉得感兴趣的读者可以和Spark on K8S这种细粒度的资源调度过程作比较将会有更多的收益。
http://www.hkea.cn/news/14543176/

相关文章:

  • 莞城建设网站网站建设公司哪家好速找盛世传媒
  • 网站建设中备案遵义住房和城乡建设局官方网站
  • 定制手机网站网站建设哪些职位
  • 电商网站特点怎样做加入购物车的网站
  • 杭州网站制作专业做家教在哪个网站
  • 宝安公司免费网站建设360网站运营
  • 高端网站设计公司排行榜网站图片加载 优化
  • 网站UI怎么做内网怎么做网站服务器
  • 苏州注册公司网上核名江门seo外包服务
  • 韩国电商网站网站备案服务类型
  • 网站系统建设费用安徽合肥紧急通报
  • 观山湖制作网站福州外贸网站建设
  • 网站建设湛江开源的公司网站
  • ps做素材下载网站汕头刚刚发生的事
  • 1号店网站模板下载手机建网站需要多少钱
  • 比较好的网站开发框架今天最新新闻事件报道
  • 自己做的网站打开太慢网站信息系统
  • 网站建设合同英文广东省建设厅官网查询
  • 游戏网站建设计划书wordpress中文英文切换
  • 做的网站提示不安全问题帝国网站 教程
  • 度假村网站建设关键词的分类和优化
  • 网站备案升级一个工厂的网站建设
  • 沈阳做网站的公司推荐自己做电影网站需要的成本
  • 电商网站怎么做西安seo经理
  • 长春网站排名优化费用笑话网站php程序
  • 网站建设专家是干嘛的暴雪网易最新消息
  • 淘宝网官方网站购物商城wordpress 标签 修改
  • 南阳做网站优化价格wordpress用户批量注册
  • 建站公司网站源码社区wordpress绑定新域名以后404
  • 长春网站怎么推广Wordpress去掉左上角标志