任务调度-源码分析

//包装成一个任务级进行提交
taskScheduler.submitTasks(new TaskSet(
       tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties,
       stage.resourceProfileId))

TaskSchedulerImpl.submitTasks

//任务集管理器
private[scheduler] def createTaskSetManager(
      taskSet: TaskSet,
      maxTaskFailures: Int): TaskSetManager = {
    new TaskSetManager(this, taskSet, maxTaskFailures, healthTrackerOpt, clock)
  }
//调度buid加入管理器
      schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

调度器初始化

 def initialize(backend: SchedulerBackend): Unit = {
    this.backend = backend
    schedulableBuilder = {
      schedulingMode match {
        case SchedulingMode.FIFO =>
          new FIFOSchedulableBuilder(rootPool)
        case SchedulingMode.FAIR =>
          new FairSchedulableBuilder(rootPool, conf)
        case _ =>
          throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
          s"$schedulingMode")
      }
    }
    schedulableBuilder.buildPools()
  }

FIFOSchedulableBuilder.addTaskSetManager

  override def addTaskSetManager(manager: Schedulable, properties: Properties): Unit = {
   //任务池
    rootPool.addSchedulable(manager)
  }

backend.reviveOffers()
CoarseGrainedSchedulerBackend.reviveOffers

 override def reviveOffers(): Unit = Utils.tryLogNonFatalError {
     //给自己发送ReviveOffers消息
    driverEndpoint.send(ReviveOffers)
  }

自己接收消息

 override def receive: PartialFunction[Any, Unit] = {
      case StatusUpdate(executorId, taskId, state, data, resources) =>
        scheduler.statusUpdate(taskId, state, data.value)
        if (TaskState.isFinished(state)) {
          executorDataMap.get(executorId) match {
            case Some(executorInfo) =>
          ... ...

      case ReviveOffers =>
      //接收到ReviveOffers消息
        makeOffers()

CoarseGrainedSchedulerBackend.makeOffers

private def makeOffers(): Unit = {
      // Make sure no executor is killed while some task is launching on it
     //得到任务的描述信息
      val taskDescs = withLock {
           ... ...
                (rName, rInfo.availableAddrs.toBuffer)
              }, executorData.resourceProfileId)
        }.toIndexedSeq
        //调度任务,从任务池里取任务 执行
        scheduler.resourceOffers(workOffers, true)
      }
      if (taskDescs.nonEmpty) {
        //任务运行
        launchTasks(taskDescs)
      }
    }

resourceOffers

    val sortedTaskSets = rootPool.getSortedTaskSetQueue
//判断本地化级别
 for (currentMaxLocality <- taskSet.myLocalityLevels) {
          var launchedTaskAtCurrentMaxLocality = false
          do {
            val (noDelayScheduleReject, minLocality) = resourceOfferSingleTaskSet(
              taskSet, currentMaxLocality, shuffledOffers, availableCpus,
              availableResources, tasks, addressesWithDescs)
            launchedTaskAtCurrentMaxLocality = minLocality.isDefined
            launchedAnyTask |= launchedTaskAtCurrentMaxLocality
            noDelaySchedulingRejects &= noDelayScheduleReject
            globalMinLocality = minTaskLocality(globalMinLocality, minLocality)
          } while (launchedTaskAtCurrentMaxLocality)
        }
 override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = {
    val sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
    val sortedSchedulableQueue =
//跟据调度算法进行manager排序   schedulableQueue.asScala.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator)
    for (schedulable <- sortedSchedulableQueue) {
      sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue.filter(_.isSchedulable)
    }
    sortedTaskSetQueue
  }


 private val taskSetSchedulingAlgorithm: SchedulingAlgorithm = {
   //不同的调度模式 有不同算法
    schedulingMode match {
      case SchedulingMode.FAIR =>
        new FairSchedulingAlgorithm()
      case SchedulingMode.FIFO =>
        new FIFOSchedulingAlgorithm()
      case _ =>
        val msg = s"Unsupported scheduling mode: $schedulingMode. Use FAIR or FIFO instead."
        throw new IllegalArgumentException(msg)
    }
  }

最终拿到任务就开始执行了
launchTasks

 private def launchTasks(tasks: Seq[Seq[TaskDescription]]): Unit = {
    遍历每一个任务描述
     for (task <- tasks.flatten) {
       val serializedTask = TaskDescription.encode(task)
     //是否task序列化的size超出限制
       if (serializedTask.limit() >= maxRpcMessageSize) {
         Option(scheduler.taskIdToTaskSetManager.get(task.taskId)).foreach { taskSetMgr =>
           try {
             var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
               s"${RPC_MESSAGE_MAX_SIZE.key} (%d bytes). Consider increasing " +
               s"${RPC_MESSAGE_MAX_SIZE.key} or using broadcast variables for large values."
             msg = msg.format(task.taskId, task.index, serializedTask.limit(), maxRpcMessageSize)
             taskSetMgr.abort(msg)
           } catch {
             case e: Exception => logError("Exception in error callback", e)
           }
         }
       }
       else {
         val executorData = executorDataMap(task.executorId)
         // Do resources allocation here. The allocated resources will get released after the task
         // finishes.
         val rpId = executorData.resourceProfileId
         val prof = scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId)
         val taskCpus = ResourceProfile.getTaskCpusOrDefaultForProfile(prof, conf)
         executorData.freeCores -= taskCpus
         task.resources.foreach { case (rName, rInfo) =>
           assert(executorData.resourcesInfo.contains(rName))
           executorData.resourcesInfo(rName).acquire(rInfo.addresses)
         }

         logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +
           s"${executorData.executorHost}.")
//找到对应executor的终端,发送LaunchTask消息
         executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
       }
     }
   }

?著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,100评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,308评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事?!?“怎么了?”我有些...
    开封第一讲书人阅读 159,718评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,275评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,376评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,454评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,464评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,248评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,686评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,974评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,150评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,817评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,484评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,140评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,374评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,012评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,041评论 2 351

推荐阅读更多精彩内容