Spark 动态资源分配下数据本地性导致的作业运行缓慢

CoarseGrainedSchedulerBackend 以 spark.scheduler.revive.interval 默认1s调用makeoffers(), 在分配到的executor上调度task;makeoffers() 中scheduler.resourceOffers(workerOffers)产生可执行的task策略,包含task到executor的映射;
每一个stage对应的tasks都由一个TaskSetManager管理,分配策略由以下生成:

for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels){
  do {
    launchedTask = resourceOfferSingleTaskSet(taskSet, maxLocality, shuffledOffers,   availableCpus, tasks)
  } while (launchedTask)}

其中myLocalityLevels是对应taskSet中所包含的所有本地性偏好级别,包括PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY;
shuffledOffers是对executor offer的随机化处理,taskSet整个分配过程是两层for循环:
第一层for循环就是上面的maxLocality.taskSet.myLocalityLevels;
第二层for循环在resourceOfferSingleTaskSet,对offer中的每一个executor进行判断,是否有一个task能够满足本地性偏好,和executor绑定一起,形成一个执行略;
其实还有第三层for循环,就是dequeueTask;
重点看TaskSetManager::resourceOffer:
一个关键的问题就是如何通过延迟机制保证数据本地行,其实现方法就在getAllowedLocalityLevel,spark关于延迟调度由三个参数:
spark.locality.wait.process, spark.locality.wait.node, spark.locality.wait.rack, 默认3s;
TaskSetManager中记录了lastLaunchTime,如果当前时间减去lastLaunchTime大于上面的值,对应getAllowedLocalityLevel就返回允许的本地偏好级别;
在遍历过程中,第一层的for循环的locality和getAllowedLocalityLevel的返回值取最小值,然后执行dequeueTask,如果dequeueTask如果返回Some(_),则更新lastLaunchTime和currentLocalityIndex;

这就带来一个问题,我们举一个极端的例子:
有100个executor和100个task,每两个executor一个node,每四个executor一个rack,100个task的本地便好全都是到executor1的process,则整个调度过程如下:

  1. 第一次dequeue task1到executor1,之后executor2~executor100的遍历,由于本地性原因,全部调度失败,并且dequeueTask导致currentLocalityIndex=0;
  2. 3s过后,currentLocalityIndex加1,getAllowedLocality返回NODE_LOCAL,导致task2被调度到executor2,但是executor3~executor100均调度失败;
  3. 2s过后假如executor1执行task1结束,executor1参与调度,task3成功调度到executor1;
  4. dequeueTask返回Some(_),currentLocalityIndex=0,lastLaunch=curTime,其他executor调度失败;
  5. 1s过后,类似步骤2的情况再次发生;
  6. 假如上米娜四步循环发生,会导致长时间的executor处于idle状态,默认60s,idle的executor被系统释放掉,
  7. stage被拖死;

应对方法:

  1. 调整currentLocalityIndex和lastLaunchTime的更新策略,能够提高task的调度效率;
  2. 减少spark.locaity.wait;

以上两点均以牺牲数据本地性为代价。

补充写一点Spark关于本地偏好的机制,Spark通过RDD的依赖关系拓扑图来描述整个一个Job的计算过程,整个拓扑图通过shuffle dependency来划分出各个stage,我们说一个stage就是从一个shuffle-read开始到一个shuffle-write,task运行的executor距离shuffle-read(或者读取dfs, cache)数据物理距离越近,本地性就越强,那hdfs距离,RDD的HadoopPartition内部就描述着split的位置信息,而这样的信息会在DAGScheduler.submitMissingTasks时通过listenerBus以SparkListenerStageSubmitted的形式通知给ExecutorAllocationManager,ExecutorAllocation据此向ExecutorAllocationClient指示最终通过YarnAllocator如何申请executor,申请获得的executor以offer的形式最终分配给TaskScheduler,offer和task最终在TaskSetManager内部完成匹配;

TaskSetManager内有几个存储结构,

pendingTasksForExecutor, 
pendingTasksForHost, 
pendingTasksForRack, 
pendingTasksWithNoPrefs, 
allPendingTasks;

由低到高,假如一个task存在于pendingTasksForExecutor,它一定存在于其他四种,相应的key就是executor所属的Host,Rack等;上面四个集合在tasks初始化的时候就根据task的location preference确定了,TaskSetManager只要根据offer内的executor,按照延迟分配的策略,匹配出对应的task即可,当然,也有可能由于本地性的原因,无法匹配出任何task;

下面终点讲一下延迟匹配的函数:

private def getAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality = {
......
  while (currentLocalityIndex < myLocalityLevels.length - 1) {    
    val moreTasks = myLocalityLevels(currentLocalityIndex) match {      
      case TaskLocality.PROCESS_LOCAL => moreTasksToRunIn(pendingTasksForExecutor)      
      case TaskLocality.NODE_LOCAL => moreTasksToRunIn(pendingTasksForHost)      
      case TaskLocality.NO_PREF => pendingTasksWithNoPrefs.nonEmpty      
      case TaskLocality.RACK_LOCAL => moreTasksToRunIn(pendingTasksForRack)     
    }    
    if (!moreTasks) {      
      // This is a performance optimization: if there are no more tasks that can      
      // be scheduled at a particular locality level, there is no point in waiting     
      // for the locality wait timeout (SPARK-4939).      
      lastLaunchTime = curTime      
      logDebug(s"No tasks for locality level ${myLocalityLevels(currentLocalityIndex)}, " +        s"so moving to locality level ${myLocalityLevels(currentLocalityIndex + 1)}")      currentLocalityIndex += 1    
    } else if (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex)) {      
      // Jump to the next locality level, and reset lastLaunchTime so that the next locality      
      // wait timer doesn't immediately expire      
      lastLaunchTime += localityWaits(currentLocalityIndex)      
      logDebug(s"Moving to ${myLocalityLevels(currentLocalityIndex + 1)} after waiting for " +        s"${localityWaits(currentLocalityIndex)}ms")      
      currentLocalityIndex += 1    } 
    else {
      return myLocalityLevels(currentLocalityIndex)
    }
  }
  myLocalityLevels(currentLocalityIndex)
}

延迟策略的关键就是currentLocalityIndex的变化,终点是上面的else if,这里面一个地方有一些微妙,就是四个级别的先后顺序是process, node, nopref, rack, any,有人可能会猜测nopref会被rack先选中,其实不可能,因为nopref对应的wait时间默认是0s,所以while循环内,在遍历到node之后,会自动遍历通过nopref,并进入下一次遍历到rack,所以如果我们本地性偏好几种在rack,就可以把所有的wait值设成0,然后rack的wait值设成1~3s,这样能够缓解因为本地偏好到来的调度效率低下,极端情况下还是不能避免上面举出的例子,但是生产情况下应该会好很多。

最后编辑于
?著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,128评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,316评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事?!?“怎么了?”我有些...
    开封第一讲书人阅读 159,737评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,283评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,384评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,458评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,467评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,251评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,688评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,980评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,155评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,818评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,492评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,142评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,382评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,020评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,044评论 2 352

推荐阅读更多精彩内容