Tekton 任务调度解析

  • 引言
  • 问题分析
  • Tekton 的实现
    数据结构
    构造 Graph
    获取调度节点
  • 总结

引言

Tekton 在执行用户定义的任务后,根据任务中的先后顺序,来执行不同的任务,但是多个任务之间可能会存在较多个关联和顺序关系,这个时候对所有的 task 进行排序并且调度下一个 task 就会变得比较复杂。我们一起看下 Tekton 的开发者们是如何解决这个问题的。

问题分析

在看 Tekton 如何解决这件事情之前,我们先自己来大概分析和理解下这个调度问题。

一个比较实际的场景是,用户在编排流水线的时候,会通过在 pipeline 中定义的 runAfter 完成任务顺序的指定,而且如果 A task 中的 resource.input 依赖于 B task 的 resourc.output,这样的话我们也需要保证 B task 应该在 A task 之后再执行。

如此想来,这个调度问题就可以简化为我们有多个节点,节点间存在依赖关系和顺序关系,需要保证所有的节点都能按照依赖关系和顺序关系来进行排序,并且可以根据当前所有的节点状态返回下次需要执行的节点。

Tekton 的实现

Tekton 使用了 DAG(Directed acyclic graph)也就是有向无环图来解决了这个问题。

通过遍历所有的任务,分析任务中的 dep,来将所有的任务插入到图中,并且在需要进行任务调度的时候,分析遍历该图,取出合适的 task 进行调度。

下面来了解下这个实现方式。

数据结构

Tekton 在实现中定义了 Node 还有 Graph 两个 struct,分别对应了有向无环图中的节点还有有向无环图。

Node 的定义有些像二叉树,结构比较简单,存储了一个当前的 task,以及在 Prev 中存储了在执行该节点之前,所有需要完成的节点。Next 中则相反存储的是所有在该节点之后执行的节点。

Graph 是一个针对所有节点的一个 Map。


// Node represents a Task in a pipeline.
type Node struct {
  // Task represent the PipelineTask in Pipeline
  Task Task
  // Prev represent all the Previous task Nodes for the current Task
  Prev []*Node
  // Next represent all the Next task Nodes for the current Task
  Next []*Node
}

// Graph represents the Pipeline Graph
type Graph struct {
  // Nodes represent map of PipelineTask name to Node in Pipeline Graph
  Nodes map[string]*Node
}

构造 Graph

这个就主要对应了该文件中的一个 Build 方法,该方法接收 pipeline 中所有的task,以及存储了所有 task 依赖信息的一个 map[string][]string 结构体,该结构体中存储了所有的 task 以及对应的依赖。

该方法通过这两个参数来完成整个图的构造。

// Build returns a valid pipeline Graph. Returns error if the pipeline is invalid
func Build(tasks Tasks, deps map[string][]string) (*Graph, error) {
  d := newGraph()

  // Add all Tasks mentioned in the `PipelineSpec`
  // 增加所有的 node 进入 graph 中
  for _, pt := range tasks.Items() {
    if _, err := d.addPipelineTask(pt); err != nil {
      return nil, fmt.Errorf("task %s is already present in Graph, can't add it again: %w", pt.HashKey(), err)
    }
  }


  // Process all from and runAfter constraints to add task dependency
  // 将所有的 dep 塞到对应的节点中
  for pt, taskDeps := range deps {
    for _, previousTask := range taskDeps {
      // 将每一个 pre 节点  都处理下  放到每个  node  的里面
      if err := addLink(pt, previousTask, d.Nodes); err != nil {
        return nil, fmt.Errorf("couldn't add link between %s and %s: %w", pt, previousTask, err)
      }
    }
  }
  return d, nil
}

在构造这个图的时候,我们先考虑下需要实现的目标有哪些。

我们应该将 Graph 中所有节点中的 Pre 和 Next 节点都补充上,并且需要保证我们在补充节点的时候,不会导致出现环状图,因此在插入节点的时候还需要做环的检测。

其中主要处理的逻辑如下:

// prev:deps 中的key   next:deps 中的某个key节点对应的所有节点中的某个
func linkPipelineTasks(prev *Node, next *Node) error {
  // Check for self cycle
  if prev.Task.HashKey() == next.Task.HashKey() {
    return fmt.Errorf("cycle detected; task %q depends on itself", next.Task.HashKey())
  }
  // Check if we are adding cycles.
  path := []string{next.Task.HashKey(), prev.Task.HashKey()}
  if err := lookForNode(prev.Prev, path, next.Task.HashKey()); err != nil {
    return fmt.Errorf("cycle detected: %w", err)
  }
  // 为 dep 中的 key 节点增加 prev 节点,增加的就是 dep 中 deps 的某个节点
  next.Prev = append(next.Prev, prev)
  // 为 dep 中的 deps 节点增加 next 节点,增加的也就是 dep 中的 key 节点
  prev.Next = append(prev.Next, next)
  return nil
}

// nodes: 理解为校验节点中的所有 Pre 节点   path:当前路径 用于错误打印   next: 当前校验节点
func lookForNode(nodes []*Node, path []string, next string) error {
  // 对该 node 的 prevs 进行处理
  for _, n := range nodes {
    path = append(path, n.Task.HashKey())
    if n.Task.HashKey() == next {
      return errors.New(getVisitedPath(path))
    }
    // 因为涉及到环的检测 所以需要对所有的 pre 中的 pre 也要做处理
    // 确保我们的 prev 在其他的节点中是不存在的  否则就是出现了环
    if err := lookForNode(n.Prev, path, next); err != nil {
      return err
    }
  }
  return nil
}

linkPipelineTasks 主要调用 lookForNode 来检测是否存在的环,如果没有存在环,则对 dep 中的 key 节点增加对应的 deps 中的某个节点作为 pre 节点,并且为 deps 中的对应节点增加 dep 中的 key 节点作为 next 节点。

lookForNode 对环的检测也比较有意思,通过递归的方式来完成,主要是检查当前处理节点中的所有 Pre 节点中,是否存在和当前节点相同的节点,通过递归,不仅会检查当前节点的 Pre 节点是否存在,还会对 Pre 节点中的 Pre 节点进行校验,保证从 Root 节点到该节点不会存在相同的节点,也就相当于做了环的检测。

在对所有的节点循环处理结束后,Graph 中所有的节点的 Pre 节点以及 Next 节点都会包含我们最开始期望的所有应该存在的节点。

获取调度节点

Graph 构造完成,在任务调度的时候,我们需要从 Graph 中依次获取到下个需要执行的任务。

大概的思考一下,如果我们来实现这个功能,应该会怎么做呢?

大体思路应该是遍历所有的节点,将节点中 Pre 节点的个数为 0 的节点或者 Pre 节点中的 task 都结束的这种节点拿出来,这个节点就是应该执行的节点。Tekton 的实现思路也大概类似,我们来看下 Tekton 的具体实现。

在Tekton的实现中, GetSchedulable 负责获取下一个节点的工作,为了简单起见,我省略了一些检测的代码。

// GetSchedulable returns a map of PipelineTask that can be scheduled (keyed
// by the name of the PipelineTask) given a list of successfully finished doneTasks.
// It returns tasks which have all dependencies marked as done, and thus can be scheduled. If the
// specified doneTasks are invalid (i.e. if it is indicated that a Task is
// done, but the previous Tasks are not done), an error is returned.
func GetSchedulable(g *Graph, doneTasks ...string) (sets.String, error) {
  roots := getRoots(g)
  tm := sets.NewString(doneTasks...)
  d := sets.NewString()

  visited := sets.NewString()
  // 对所有的根  进行循环处理  因为可能会存在较多的可以调度的节点
  for _, root := range roots {
    // 找出可以调度的节点出来
    schedulable := findSchedulable(root, visited, tm)
    for _, task := range schedulable {
      d.Insert(task.HashKey())
    }
  }

  ...
  return d, nil
}

我们可以看出,该函数会获取到 DAG 图和对应的已完成任务的列表,通过这两个信息来获取应该调度的任务。

在这里先获取到了所有的 root,这里是个数组(因为有可能某个 pipeline 可能会存在多个 root 的节点,比如第一个步骤就是一个并行的步骤),之后会对该 root 调用 findSchedulable 方法,这个方法主要是完成了节点的查找。

这个方法也用到了递归的方式,处理逻辑也比较简单明确,最终返回结果就是参数中 n 节点下,所有的可以调度的节点。

// n: 处理节点  visited: 已经访问过的节点  doneTasks: 标记为已完成的节点
func findSchedulable(n *Node, visited sets.String, doneTasks sets.String) []Task {
  if visited.Has(n.Task.HashKey()) {
    return []Task{}
  }
  // 为了防止重复的拜访
  visited.Insert(n.Task.HashKey())

  // 如果这个节点已经标记为结束了  则去查找 next 节点  从 next 节点中查找到对应的应该调度的节点
  // 这里是为了对下面的所有的 next node 都进行处理  所以使用递归是最合适的方式
  if doneTasks.Has(n.Task.HashKey()) {
    schedulable := []Task{}
    // This one is done! Take note of it and look at the next candidate
    for _, next := range n.Next {
      if _, ok := visited[next.Task.HashKey()]; !ok {
        schedulable = append(schedulable, findSchedulable(next, visited, doneTasks)...)
      }
    }
    return schedulable
  }
  // 如果这个节点没有被标记为结束
  // 这里的处理 才是递归结束的地方
  // This one isn't done! Return it if it's schedulable
  if isSchedulable(doneTasks, n.Prev) {
    // FIXME(vdemeester)
    return []Task{n.Task}
  }
  // This one isn't done, but it also isn't ready to schedule
  return []Task{}
}

因为节点被访问后,无需再被处理,因此传入了 visited 来标记出哪些节点已经访问过,已经访问过的节点不再访问,防止在递归访问中出现多次访问,导致最后返回可调度节点中出现重复的节点。

之后就是我们需要找到的 递归继续的条件 和 递归的终止条件。

递归的继续条件是,如果当前节点已经被标记为完成,则需要对该节点的 next 节点进行递归查询,直到最终将该 root 下所有的可调度节点都返回回来。

递归的终止条件就是如果当前节点,他的所有 Pre 节点都已经完成,则该节点应该就是可以被调度的节点,这种情况下我们直接返回该节点。否则返回为空节点。

通过该方式,可以保证每次都能拿到需要调度的节点,实现的简洁又漂亮。

总结

数据结构和算法的威力在这里得到了体现,这里用到了 DAG 这种数据数据结构,并且配合递归完美了解决了流水线调度的问题,代码简短容易理解,基本上所有的处理逻辑跟 DAG 中的遍历逻辑大体一致。

在源代码库中,也配置了比较全面的测试文件,对于阅读源代码,调试代码也有很大的帮助。推荐大家可以阅读下代码,自己运行测试来体验下。

References:

Tekton DAG 实现
有向无环图

最后编辑于
?著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,128评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,316评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事?!?“怎么了?”我有些...
    开封第一讲书人阅读 159,737评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,283评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,384评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,458评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,467评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,251评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,688评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,980评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,155评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,818评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,492评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,142评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,382评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,020评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,044评论 2 352

推荐阅读更多精彩内容