//包装成一个任务级进行提交
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties,
stage.resourceProfileId))
TaskSchedulerImpl.submitTasks
//任务集管理器
private[scheduler] def createTaskSetManager(
taskSet: TaskSet,
maxTaskFailures: Int): TaskSetManager = {
new TaskSetManager(this, taskSet, maxTaskFailures, healthTrackerOpt, clock)
}
//调度buid加入管理器
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
调度器初始化
def initialize(backend: SchedulerBackend): Unit = {
this.backend = backend
schedulableBuilder = {
schedulingMode match {
case SchedulingMode.FIFO =>
new FIFOSchedulableBuilder(rootPool)
case SchedulingMode.FAIR =>
new FairSchedulableBuilder(rootPool, conf)
case _ =>
throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
s"$schedulingMode")
}
}
schedulableBuilder.buildPools()
}
FIFOSchedulableBuilder.addTaskSetManager
override def addTaskSetManager(manager: Schedulable, properties: Properties): Unit = {
//任务池
rootPool.addSchedulable(manager)
}
backend.reviveOffers()
CoarseGrainedSchedulerBackend.reviveOffers
override def reviveOffers(): Unit = Utils.tryLogNonFatalError {
//给自己发送ReviveOffers消息
driverEndpoint.send(ReviveOffers)
}
自己接收消息
override def receive: PartialFunction[Any, Unit] = {
case StatusUpdate(executorId, taskId, state, data, resources) =>
scheduler.statusUpdate(taskId, state, data.value)
if (TaskState.isFinished(state)) {
executorDataMap.get(executorId) match {
case Some(executorInfo) =>
... ...
case ReviveOffers =>
//接收到ReviveOffers消息
makeOffers()
CoarseGrainedSchedulerBackend.makeOffers
private def makeOffers(): Unit = {
// Make sure no executor is killed while some task is launching on it
//得到任务的描述信息
val taskDescs = withLock {
... ...
(rName, rInfo.availableAddrs.toBuffer)
}, executorData.resourceProfileId)
}.toIndexedSeq
//调度任务,从任务池里取任务 执行
scheduler.resourceOffers(workOffers, true)
}
if (taskDescs.nonEmpty) {
//任务运行
launchTasks(taskDescs)
}
}
resourceOffers
val sortedTaskSets = rootPool.getSortedTaskSetQueue
//判断本地化级别
for (currentMaxLocality <- taskSet.myLocalityLevels) {
var launchedTaskAtCurrentMaxLocality = false
do {
val (noDelayScheduleReject, minLocality) = resourceOfferSingleTaskSet(
taskSet, currentMaxLocality, shuffledOffers, availableCpus,
availableResources, tasks, addressesWithDescs)
launchedTaskAtCurrentMaxLocality = minLocality.isDefined
launchedAnyTask |= launchedTaskAtCurrentMaxLocality
noDelaySchedulingRejects &= noDelayScheduleReject
globalMinLocality = minTaskLocality(globalMinLocality, minLocality)
} while (launchedTaskAtCurrentMaxLocality)
}
override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = {
val sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
val sortedSchedulableQueue =
//跟据调度算法进行manager排序 schedulableQueue.asScala.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator)
for (schedulable <- sortedSchedulableQueue) {
sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue.filter(_.isSchedulable)
}
sortedTaskSetQueue
}
private val taskSetSchedulingAlgorithm: SchedulingAlgorithm = {
//不同的调度模式 有不同算法
schedulingMode match {
case SchedulingMode.FAIR =>
new FairSchedulingAlgorithm()
case SchedulingMode.FIFO =>
new FIFOSchedulingAlgorithm()
case _ =>
val msg = s"Unsupported scheduling mode: $schedulingMode. Use FAIR or FIFO instead."
throw new IllegalArgumentException(msg)
}
}
最终拿到任务就开始执行了
launchTasks
private def launchTasks(tasks: Seq[Seq[TaskDescription]]): Unit = {
遍历每一个任务描述
for (task <- tasks.flatten) {
val serializedTask = TaskDescription.encode(task)
//是否task序列化的size超出限制
if (serializedTask.limit() >= maxRpcMessageSize) {
Option(scheduler.taskIdToTaskSetManager.get(task.taskId)).foreach { taskSetMgr =>
try {
var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
s"${RPC_MESSAGE_MAX_SIZE.key} (%d bytes). Consider increasing " +
s"${RPC_MESSAGE_MAX_SIZE.key} or using broadcast variables for large values."
msg = msg.format(task.taskId, task.index, serializedTask.limit(), maxRpcMessageSize)
taskSetMgr.abort(msg)
} catch {
case e: Exception => logError("Exception in error callback", e)
}
}
}
else {
val executorData = executorDataMap(task.executorId)
// Do resources allocation here. The allocated resources will get released after the task
// finishes.
val rpId = executorData.resourceProfileId
val prof = scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId)
val taskCpus = ResourceProfile.getTaskCpusOrDefaultForProfile(prof, conf)
executorData.freeCores -= taskCpus
task.resources.foreach { case (rName, rInfo) =>
assert(executorData.resourcesInfo.contains(rName))
executorData.resourcesInfo(rName).acquire(rInfo.addresses)
}
logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +
s"${executorData.executorHost}.")
//找到对应executor的终端,发送LaunchTask消息
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
}
}
}