Flink 原理与实现:如何生成 StreamGraph

继上文Flink 原理与实现:架构和拓扑概览中介绍了Flink的四层执行图模型,本文将主要介绍 Flink 是如何根据用户用Stream API编写的程序,构造出一个代表拓扑结构的StreamGraph的。

注:本文比较偏源码分析,所有代码都是基于 flink-1.0.x 版本,建议在阅读本文前先对Stream API有个了解,详见官方文档。

StreamGraph 相关的代码主要在 org.apache.flink.streaming.api.graph 包中。构造StreamGraph的入口函数是 StreamGraphGenerator.generate(env, transformations)。该函数会由触发程序执行的方法StreamExecutionEnvironment.execute()调用到。也就是说 StreamGraph 是在 Client 端构造的,这也意味着我们可以在本地通过调试观察 StreamGraph 的构造过程。

Transformation

StreamGraphGenerator.generate 的一个关键的参数是 List<StreamTransformation<?>>。StreamTransformation代表了从一个或多个DataStream生成新DataStream的操作。DataStream的底层其实就是一个 StreamTransformation,描述了这个DataStream是怎么来的。

StreamTransformation的类图如下图所示:


DataStream 上常见的 transformation 有 map、flatmap、filter等(见DataStream Transformation了解更多)。这些transformation会构造出一棵 StreamTransformation 树,通过这棵树转换成 StreamGraph。比如 DataStream.map源码如下,其中SingleOutputStreamOperator为DataStream的子类:

public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) {
  // 通过java reflection抽出mapper的返回值类型
  TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType(),
      Utils.getCallLocationName(), true);

  // 返回一个新的DataStream,SteramMap 为 StreamOperator 的实现类
  return transform("Map", outType, new StreamMap<>(clean(mapper)));
}

public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
  // read the output type of the input Transform to coax out errors about MissingTypeInfo
  transformation.getOutputType();

  // 新的transformation会连接上当前DataStream中的transformation,从而构建成一棵树
  OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
      this.transformation,
      operatorName,
      operator,
      outTypeInfo,
      environment.getParallelism());

  @SuppressWarnings({ "unchecked", "rawtypes" })
  SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);

  // 所有的transformation都会存到 env 中,调用execute时遍历该list生成StreamGraph
  getExecutionEnvironment().addOperator(resultTransform);

  return returnStream;
}

从上方代码可以了解到,map转换将用户自定义的函数MapFunction包装到StreamMap这个Operator中,再将StreamMap包装到OneInputTransformation,最后该transformation存到env中,当调用env.execute时,遍历其中的transformation集合构造出StreamGraph。其分层实现如下图所示:

另外,并不是每一个 StreamTransformation 都会转换成 runtime 层中物理操作。有一些只是逻辑概念,比如 union、split/select、partition等。如下图所示的转换树,在运行时会优化成下方的操作图。

union、split/select、partition中的信息会被写入到 Source –> Map 的边中。通过源码也可以发现,UnionTransformation,SplitTransformation,SelectTransformation,PartitionTransformation由于不包含具体的操作所以都没有StreamOperator成员变量,而其他StreamTransformation的子类基本上都有。

StreamOperator

DataStream 上的每一个 Transformation 都对应了一个 StreamOperator,StreamOperator是运行时的具体实现,会决定UDF(User-Defined Funtion)的调用方式。下图所示为 StreamOperator 的类图(点击查看大图):

可以发现,所有实现类都继承了AbstractStreamOperator。另外除了 project 操作,其他所有可以执行UDF代码的实现类都继承自AbstractUdfStreamOperator,该类是封装了UDF的StreamOperator。UDF就是实现了Function接口的类,如MapFunction,FilterFunction。

生成 StreamGraph 的源码分析

我们通过在DataStream上做了一系列的转换(map、filter等)得到了StreamTransformation集合,然后通过StreamGraphGenerator.generate获得StreamGraph,该方法的源码如下:

// 构造 StreamGraph 入口函数
public static StreamGraph generate(StreamExecutionEnvironment env, List<StreamTransformation<?>> transformations) {
    return new StreamGraphGenerator(env).generateInternal(transformations);
}

// 自底向上(sink->source)对转换树的每个transformation进行转换。
private StreamGraph generateInternal(List<StreamTransformation<?>> transformations) {
  for (StreamTransformation<?> transformation: transformations) {
    transform(transformation);
  }
  return streamGraph;
}

// 对具体的一个transformation进行转换,转换成 StreamGraph 中的 StreamNode 和 StreamEdge
// 返回值为该transform的id集合,通常大小为1个(除FeedbackTransformation)
private Collection<Integer> transform(StreamTransformation<?> transform) {  
  // 跳过已经转换过的transformation
  if (alreadyTransformed.containsKey(transform)) {
    return alreadyTransformed.get(transform);
  }

  LOG.debug("Transforming " + transform);

  // 为了触发 MissingTypeInfo 的异常
  transform.getOutputType();

  Collection<Integer> transformedIds;
  if (transform instanceof OneInputTransformation<?, ?>) {
    transformedIds = transformOnInputTransform((OneInputTransformation<?, ?>) transform);
  } else if (transform instanceof TwoInputTransformation<?, ?, ?>) {
    transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
  } else if (transform instanceof SourceTransformation<?>) {
    transformedIds = transformSource((SourceTransformation<?>) transform);
  } else if (transform instanceof SinkTransformation<?>) {
    transformedIds = transformSink((SinkTransformation<?>) transform);
  } else if (transform instanceof UnionTransformation<?>) {
    transformedIds = transformUnion((UnionTransformation<?>) transform);
  } else if (transform instanceof SplitTransformation<?>) {
    transformedIds = transformSplit((SplitTransformation<?>) transform);
  } else if (transform instanceof SelectTransformation<?>) {
    transformedIds = transformSelect((SelectTransformation<?>) transform);
  } else if (transform instanceof FeedbackTransformation<?>) {
    transformedIds = transformFeedback((FeedbackTransformation<?>) transform);
  } else if (transform instanceof CoFeedbackTransformation<?>) {
    transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);
  } else if (transform instanceof PartitionTransformation<?>) {
    transformedIds = transformPartition((PartitionTransformation<?>) transform);
  } else {
    throw new IllegalStateException("Unknown transformation: " + transform);
  }

  // need this check because the iterate transformation adds itself before
  // transforming the feedback edges
  if (!alreadyTransformed.containsKey(transform)) {
    alreadyTransformed.put(transform, transformedIds);
  }

  if (transform.getBufferTimeout() > 0) {
    streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());
  }
  if (transform.getUid() != null) {
    streamGraph.setTransformationId(transform.getId(), transform.getUid());
  }

  return transformedIds;
}

最终都会调用 transformXXX 来对具体的StreamTransformation进行转换。我们可以看下transformOnInputTransform(transform)的实现:

private <IN, OUT> Collection<Integer> transformOnInputTransform(OneInputTransformation<IN, OUT> transform) {
// 递归对该transform的直接上游transform进行转换,获得直接上游id集合
Collection<Integer> inputIds = transform(transform.getInput());

// 递归调用可能已经处理过该transform了
if (alreadyTransformed.containsKey(transform)) {
  return alreadyTransformed.get(transform);
}

String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);

// 添加 StreamNode
streamGraph.addOperator(transform.getId(),
    slotSharingGroup,
    transform.getOperator(),
    transform.getInputType(),
    transform.getOutputType(),
    transform.getName());

if (transform.getStateKeySelector() != null) {
  TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(env.getConfig());
  streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer);
}

streamGraph.setParallelism(transform.getId(), transform.getParallelism());

// 添加 StreamEdge
for (Integer inputId: inputIds) {
  streamGraph.addEdge(inputId, transform.getId(), 0);
}

return Collections.singleton(transform.getId());
}

该函数首先会对该transform的上游transform进行递归转换,确保上游的都已经完成了转化。然后通过transform构造出StreamNode,最后与上游的transform进行连接,构造出StreamNode。

最后再来看下对逻辑转换(partition、union等)的处理,如下是transformPartition函数的源码:

private <T> Collection<Integer> transformPartition(PartitionTransformation<T> partition) {
  StreamTransformation<T> input = partition.getInput();
  List<Integer> resultIds = new ArrayList<>();

  // 直接上游的id
  Collection<Integer> transformedIds = transform(input);
  for (Integer transformedId: transformedIds) {
    // 生成一个新的虚拟id
    int virtualId = StreamTransformation.getNewNodeId();
    // 添加一个虚拟分区节点,不会生成 StreamNode
    streamGraph.addVirtualPartitionNode(transformedId, virtualId, partition.getPartitioner());
    resultIds.add(virtualId);
  }

  return resultIds;
}

对partition的转换没有生成具体的StreamNodeStreamEdge,而是添加一个虚节点。当partition的下游transform(如map)添加edge时(调用StreamGraph.addEdge),会把partition信息写入到edge中。如StreamGraph.addEdgeInternal所示:

public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber) {
  addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, null, new ArrayList<String>());
}
private void addEdgeInternal(Integer upStreamVertexID,
    Integer downStreamVertexID,
    int typeNumber,
    StreamPartitioner<?> partitioner,
    List<String> outputNames) {

  // 当上游是select时,递归调用,并传入select信息
  if (virtualSelectNodes.containsKey(upStreamVertexID)) {
    int virtualId = upStreamVertexID;
    // select上游的节点id
    upStreamVertexID = virtualSelectNodes.get(virtualId).f0;
    if (outputNames.isEmpty()) {
      // selections that happen downstream override earlier selections
      outputNames = virtualSelectNodes.get(virtualId).f1;
    }
    addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames);
  } 
  // 当上游是partition时,递归调用,并传入partitioner信息
  else if (virtuaPartitionNodes.containsKey(upStreamVertexID)) {
    int virtualId = upStreamVertexID;
    // partition上游的节点id
    upStreamVertexID = virtuaPartitionNodes.get(virtualId).f0;
    if (partitioner == null) {
      partitioner = virtuaPartitionNodes.get(virtualId).f1;
    }
    addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames);
  } else {
    // 真正构建StreamEdge
    StreamNode upstreamNode = getStreamNode(upStreamVertexID);
    StreamNode downstreamNode = getStreamNode(downStreamVertexID);

    // 未指定partitioner的话,会为其选择 forward 或 rebalance 分区。
    if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
      partitioner = new ForwardPartitioner<Object>();
    } else if (partitioner == null) {
      partitioner = new RebalancePartitioner<Object>();
    }

    // 健康检查, forward 分区必须要上下游的并发度一致
    if (partitioner instanceof ForwardPartitioner) {
      if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {
        throw new UnsupportedOperationException("Forward partitioning does not allow " +
            "change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() +
            ", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() +
            " You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");
      }
    }
    // 创建 StreamEdge
    StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner);
    // 将该 StreamEdge 添加到上游的输出,下游的输入
    getStreamNode(edge.getSourceId()).addOutEdge(edge);
    getStreamNode(edge.getTargetId()).addInEdge(edge);
  }
}

实例讲解

如下程序,是一个从 Source 中按行切分成单词并过滤输出的简单流程序,其中包含了逻辑转换:随机分区shuffle。我们会分析该程序是如何生成StreamGraph的。

DataStream<String> text = env.socketTextStream(hostName, port);
text.flatMap(new LineSplitter()).shuffle().filter(new HelloFilter()).print();

首先会在env中生成一棵transformation树,用List<StreamTransformation<?>>保存。其结构图如下:


其中符号*为input指针,指向上游的transformation,从而形成了一棵transformation树。然后,通过调用StreamGraphGenerator.generate(env, transformations)来生成StreamGraph。自底向上递归调用每一个transformation,也就是说处理顺序是Source->FlatMap->Shuffle->Filter->Sink

如上图所示:

  1. 首先处理的Source,生成了SourceStreamNode。
  2. 然后处理的FlatMap,生成了FlatMapStreamNode,并生成StreamEdge连接上游SourceFlatMap。由于上下游的并发度不一样(1:4),所以此处是Rebalance分区。
  3. 然后处理的Shuffle,由于是逻辑转换,并不会生成实际的节点。将partitioner信息暂存在virtuaPartitionNodes中。
  4. 在处理Filter时,生成了FilterStreamNode。发现上游是shuffle,找到shuffle的上游FlatMap,创建StreamEdgeFilter相连。并把ShufflePartitioner的信息写到StreamEdge中。
  5. 最后处理Sink,创建SinkStreamNode,并生成StreamEdge与上游Filter相连。由于上下游并发度一样(4:4),所以此处选择Forward分区。

最后可以通过 UI可视化 来观察得到的 StreamGraph。


总结

本文主要介绍了 Stream API 中 Transformation 和 Operator 的概念,以及如何根据Stream API编写的程序,构造出一个代表拓扑结构的StreamGraph的。本文的源码分析涉及到较多代码,如果有兴趣建议结合完整源码进行学习。下一篇文章将介绍 StreamGraph 如何转换成 JobGraph 的,其中设计到了图优化的技巧。


转载:http://wuchong.me/blog/2016/05/04/flink-internal-how-to-build-streamgraph/

?著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,992评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,212评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事?!?“怎么了?”我有些...
    开封第一讲书人阅读 159,535评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,197评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,310评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,383评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,409评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,191评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,621评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,910评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,084评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,763评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,403评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,083评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,318评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,946评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,967评论 2 351

推荐阅读更多精彩内容

  • 在Flink中,由用户代码生成调度层图结构,可以分成3步走:通过Stream API编写的用户代码 -> Stre...
    MaQingxiang阅读 4,489评论 0 18
  • Flink 集群构建 & 逻辑计划生成 转载:Flink 集群构建 & 逻辑计划生成 概要和背景 flink是一个...
    raincoffee阅读 727评论 0 0
  • Data Sources 源是程序读取输入数据的位置。可以使用 StreamExecutionEnvironmen...
    Alex90阅读 2,953评论 0 1
  • 好變態不斷的提醒讓我以為這個週末是我生日,表姐的窗口跳出來上書,這週我去找你啊。我大言不慚的敲過去週六我生日!...
    宋小朝阅读 230评论 0 0
  • 李占达 秋天来了,树叶有的变黄了,有的变红了,有的飘落下来了,草也枯了,但是值得值...
    李占达阅读 371评论 0 1