Flink JobGraph源码阅读与分析

1. 主要内容

本文主要是将用户写的java程序如何生成Flink JobGraph的过程与逻辑追踪了一下,欢迎有兴趣的读者一起探讨与交流

2. 用户程序

public class WindowWordCount {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Tuple2<String, Integer>> dataStream = env
                .socketTextStream("localhost", 9999)
                .flatMap(new Splitter())
                .keyBy(0)
                .timeWindow(Time.seconds(5))
                .sum(1);

        dataStream.print();

        env.execute("Window WordCount");
    }

    public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
            for (String word: sentence.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }

}

现在开始追踪代码

  • StreamExecutionEnvironment.getExecutionEnvironment()的代码逻辑如下
public static StreamExecutionEnvironment getExecutionEnvironment() {
               //在streaming 环境中,这个一直是null
       if (contextEnvironmentFactory != null) {
           return contextEnvironmentFactory.createExecutionEnvironment();
       }

       // because the streaming project depends on "flink-clients" (and not the other way around)
       // we currently need to intercept the data set environment and create a dependent stream env.
       // this should be fixed once we rework the project dependencies
       // 肯定会走到这里,在之前的分享中提到,这里会调用ContextEnvironmentFactory.ContextEnvironmentFactory,即 ContextEnvironment
       ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
       if (env instanceof ContextEnvironment) {
           return new StreamContextEnvironment((ContextEnvironment) env);
       } else if (env instanceof OptimizerPlanEnvironment || env instanceof PreviewPlanEnvironment) {
           return new StreamPlanEnvironment(env);
       } else {
           return createLocalEnvironment();
       }
   }

上述代码最终会返回StreamContextEnvironment((ContextEnvironment) env), 接下来env通过socketTextStream()方法创建DataStream, 在说明示例中的几个Operator前,先阐述一下DataStream类之间继承关系, 如下图:

image.png

socketTextStream创建的为DataStreamSource,除了这个之外,其它比较重要的DataStream类为KeyedStream, 这个是调用了keyBy()API生成的DataStream, 而常见的map()filter则生成SingleOutputSteamOperator。

回到socketTextStream(), 代码为:

    public DataStreamSource<String> socketTextStream(String hostname, int port, String delimiter, long maxRetry) {
        return addSource(new SocketTextStreamFunction(hostname, port, delimiter, maxRetry),
                "Socket Stream");
    }

最后会调用:

    public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName, TypeInformation<OUT> typeInfo) {

        if (typeInfo == null) {
            if (function instanceof ResultTypeQueryable) {
                typeInfo = ((ResultTypeQueryable<OUT>) function).getProducedType();
            } else {
                try {
                    typeInfo = TypeExtractor.createTypeInfo(
                            SourceFunction.class,
                            function.getClass(), 0, null, null);
                } catch (final InvalidTypesException e) {
                    typeInfo = (TypeInformation<OUT>) new MissingTypeInfo(sourceName, e);
                }
            }
        }

        boolean isParallel = function instanceof ParallelSourceFunction;

        clean(function);

        final StreamSource<OUT, ?> sourceOperator = new StreamSource<>(function);
        return new DataStreamSource<>(this, typeInfo, sourceOperator, isParallel, sourceName);
    }

其中的参数function为SocketTextStreamFunction() 这个function会读取socket的内容生成StreamSource。而以上代码的逻辑如下:

  1. 首先提取function的type, 根据后面的代码逻辑,SocketTextStreamFunction会得到TypeInformation<String>
  2. 清除闭包field
  3. 构造SourceOperator. 其中DataStreamSource的构造函数如下
    public DataStreamSource(StreamExecutionEnvironment environment,
            TypeInformation<T> outTypeInfo, StreamSource<T, ?> operator,
            boolean isParallel, String sourceName) {
        super(environment, new SourceTransformation<>(sourceName, operator, outTypeInfo, environment.getParallelism()));

        this.isParallel = isParallel;
        if (!isParallel) {
            setParallelism(1);
        }
    }

在flink中StreamOperator与Transformation类之间继承与实现关系如下:

StreamOperator
Transformation

后面的.flatMap均为DataStream提供的方法, 代码为

    public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) {

        TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper),
                getType(), Utils.getCallLocationName(), true);

        return transform("Flat Map", outType, new StreamFlatMap<>(clean(flatMapper)));

    }

    public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {

        // read the output type of the input Transform to coax out errors about MissingTypeInfo
        transformation.getOutputType();

        OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
                this.transformation,
                operatorName,
                operator,
                outTypeInfo,
                environment.getParallelism());

        @SuppressWarnings({ "unchecked", "rawtypes" })
        SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);

        getExecutionEnvironment().addOperator(resultTransform);

        return returnStream;
    }

可以看到整体逻辑为:

  1. 用户的function 构造了StreamOperator, StreamOperator结构如下:


    image.png
  2. 通过输入的transformation与StreamOperation构造当前的当前的Transformation


    image.png
  1. 通过当前的Transformation构造DataStream

  2. 将当前Transformation放入当前StreamEnvironment的中, 后面生成执行计划时会用这个数据结构

后面的几个operator如KeyBy, Filter 其DataStream生成逻辑类似,感兴趣的读者可以自行去查看

最后总结一下DataStream、StreamOperator、StreamTransformation之间的调用关系:

DataSteam、StreamOperator、StreamTransformation关系

3. StreamGraph

StreamContextEnvironment通过execute开始整个任务的执行, 主要代码如下:

public JobExecutionResult execute(String jobName) throws Exception {
                ...
        StreamGraph streamGraph = this.getStreamGraph();
                ...
         return ctx
                .getClient()
                .run(streamGraph, ctx.getJars(), ctx.getClasspaths(), ctx.getUserCodeClassLoader(), ctx.getSavepointRestoreSettings())
                .getJobExecutionResult();

    }

其中getStreamGraph()是获得StreamGraph主要处理函数,其核心代码为:

    public static StreamGraph generate(StreamExecutionEnvironment env, List<StreamTransformation<?>> transformations) {
        //StreamGraphGenerator的构造函数将env传过去了,主要是使用了env中一些配置和对应的transformations
        return new StreamGraphGenerator(env).generateInternal(transformations);
    }

参数transformations是整个job生成的transformation集合, 每调用一个API 算子就会生成一个或多个transformations。 紧接着调用generateInternal(), 其代码如下:

private StreamGraph generateInternal(List<StreamTransformation<?>> transformations) {
        for (StreamTransformation<?> transformation: transformations) {
            //遍历每一个transformation并处理
            transform(transformation);
        }
        return streamGraph;
    }

generateInternal会对每一个transformation执行transform()方法,该方法的核心逻辑为:

...
        Collection<Integer> transformedIds;
        if (transform instanceof OneInputTransformation<?, ?>) {
            transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);
        } else if (transform instanceof TwoInputTransformation<?, ?, ?>) {
            transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
        } else if (transform instanceof SourceTransformation<?>) {
            transformedIds = transformSource((SourceTransformation<?>) transform);
        } else if (transform instanceof SinkTransformation<?>) {
            transformedIds = transformSink((SinkTransformation<?>) transform);
        } else if (transform instanceof UnionTransformation<?>) {
            transformedIds = transformUnion((UnionTransformation<?>) transform);
        } else if (transform instanceof SplitTransformation<?>) {
            transformedIds = transformSplit((SplitTransformation<?>) transform);
        } else if (transform instanceof SelectTransformation<?>) {
            transformedIds = transformSelect((SelectTransformation<?>) transform);
        } else if (transform instanceof FeedbackTransformation<?>) {
            transformedIds = transformFeedback((FeedbackTransformation<?>) transform);
        } else if (transform instanceof CoFeedbackTransformation<?>) {
            transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);
        } else if (transform instanceof PartitionTransformation<?>) {
            transformedIds = transformPartition((PartitionTransformation<?>) transform);
        } else if (transform instanceof SideOutputTransformation<?>) {
            transformedIds = transformSideOutput((SideOutputTransformation<?>) transform);
        } else {
            throw new IllegalStateException("Unknown transformation: " + transform);
        }
        return transformedIds
...

上面的逻辑是判断对应的transformation类型,生成应的Map<transformation, List<integer>>, List<integer>的作用是什么呢? 比如说同一个DataStream接多个Sink时,JobGraph会依据id来来获取source, 此时List的size = 1, 具体的会在JobGraph用到。现在以transformOneInputTransform()例进行代码说明

private <IN, OUT> Collection<Integer> transformOneInputTransform(OneInputTransformation<IN, OUT> transform) {
         //处理这个Transformation的Input
        Collection<Integer> inputIds = transform(transform.getInput());

        ...
      //Slot Group 会在后会给Task分配Slot中用到,用于隔离不的Task在不同的Slot中,此处先不管
        String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);
      //向StreamGraph中添加StreamNode及StreamEdge
        streamGraph.addOperator(transform.getId(),
                slotSharingGroup,
                transform.getCoLocationGroupKey(),
                transform.getOperatorFactory(),
                transform.getInputType(),
                transform.getOutputType(),
                transform.getName());
        ...       
        // 添加输入输出关系的edge, 常规的opeartor只是添加一个连接input Node与本node的边,特殊的streamNode
       // 如split/select 过程稍微复杂一点,下面有针对split/select说明
        for (Integer inputId: inputIds) {
            streamGraph.addEdge(inputId, transform.getId(), 0);
        }

        return Collections.singleton(transform.getId());
    }

以上为transformOneInputTransform处理逻辑,其它的Transformation 类似,可以发现StreamGraph中记录了两个最重要的部分: Transformation及连接Transformation的Edge, 后面生成JobGraph主要就是用这个两个数据结构

查看StreamGraph代码可以知道StreamGraph中主要数据结构有

  • StreamNode主要封装transformation及transformation中的元数据
  • StreamEdge主要用于串联StreamNode的依赖关系
    其组织结构如下:
StreamGraph
StreamNode

StreamNode通过StreamEdge相连,一个StreamNode可能有多个输入Edge,多个输出Edge,比如说某个Operator有多个输出,多个输入。而StreamNode中有以下关键的数据结构:

  • InputEdges和OutputEdges 输入与输出edges。
  • parallelism 结点并行度,每个结点可以单独设置,在生成ExecutionGraph时会用到这个parallelism,如果不设置,默认值为其输入的并行度。
  • SlotSharingGroup SlotGroup会影响生成的Task对Slot的分配策略。后面会专门针对这个进行说明
  • jobVertexClass 这是Flink Job生成ExecutionGraph后对应Task执行入口函数,每一个Execution对应一个jobVertexClass, 其基类为AbstractInvokable,组织结构如下:


    StreamNode对应的Task类

当ExecutionGraph在TaskManager执行的时候,执行的入口就在上述的类中, 比如说初始化Task、Checkpoint等等

  • id, 这个id其实是Transformation对应的id,通过id与Transformation 建立一一映射关系

在StreamGraph中还有一类特殊的StreamNode:虚节点。虚结点在StreamAPI中对应的操作如'DataStream.split(xxx).select(xxx)', 针对这些特殊的结点, flink 做了特殊处理。

Split和Select处理

假如一个DataStream有Split与Select这两个operator, 其组织结构为streamA.split().select(), 那么在对应的StreamGraph图中,这一部分关系为:


split & select

在翻译Split operator的时候,会生成一个虚拟的StreamNode B, 并把selector放入StreamNode B的outputSelectors中(数据结构为List<OutputSelector>), Input 为StreamNode A, 而在翻译
Select的时候,首先翻译其输入VirtualNode B, 在翻译VirtualNode B时会在StreamGraph的virtualSelectNodes的Map中添加虚拟StreamNode与虚拟StreamNode输入的映射关系,Map的key为VirtualNode B id, 值为Tuple2<>,分别为StreamNode A id, SelectNames,即表示: 虚拟StreamNode的输入为StreamNode A并通过SelectNames从A中获取SplitStream。

问题来了,那边的关系呢? 这需要在翻译下游StreamNode才会确定,在翻译OtherNode时,添加边的逻辑请看上面代码transformOneInputTransform, input 为虚拟StreamNode, 在添加边的逻辑中有

private void addEdgeInternal(Integer upStreamVertexID,
            Integer downStreamVertexID,
            int typeNumber,
            StreamPartitioner<?> partitioner,
            List<String> outputNames,
            OutputTag outputTag) {

        if (virtualSideOutputNodes.containsKey(upStreamVertexID)) {
            int virtualId = upStreamVertexID;
            upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0;
            if (outputTag == null) {
                outputTag = virtualSideOutputNodes.get(virtualId).f1;
            }
            addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, null, outputTag);
        } else if (virtualSelectNodes.containsKey(upStreamVertexID)) {
            //第一次进来进入这里,因为upStreamVertexID是一个虚拟的StreamNode
            int virtualId = upStreamVertexID;
            //得到SelectNode的InputNode id, 即StreamNode A id
            upStreamVertexID = virtualSelectNodes.get(virtualId).f0;
            if (outputNames.isEmpty()) {
                // selections that happen downstream override earlier selections
                outputNames = virtualSelectNodes.get(virtualId).f1;
            }
             //会再一次调addEdgeInternal, 输入为 StreamNode A
            addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag);
        } else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {
            int virtualId = upStreamVertexID;
            upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;
            if (partitioner == null) {
                partitioner = virtualPartitionNodes.get(virtualId).f1;
            }
            addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag);
        } else {
              //第二次进入走到这里
            StreamNode upstreamNode = getStreamNode(upStreamVertexID);
            StreamNode downstreamNode = getStreamNode(downStreamVertexID);

             ...
             // 构造StreamEdge, 在split/select 模型中,edge会保存outputNames, 而upstreamNode 记录Selector函数
            StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner, outputTag);

            getStreamNode(edge.getSourceId()).addOutEdge(edge);
            getStreamNode(edge.getTargetId()).addInEdge(edge);
        }

FeedBack、Partition、SideOutput

处理基本与Split/Select类似,请参考以上内容

最终生成一个StreamGraph,其包括所有StreamNode与StreamEdge, 通过这些StreamNodes、StreamEdges及每一个Nodes的配置(如并行度、SlotGroup等)来生成一个完整的JobGraph

JobGraph

得到StreamGraph后, 现在我们看一下如何生成JobGraph, 其调用链路为 ClusterClient#run() --> ClusterClient#getJobGraph() --> StreamingPlan#getJobGraph() --> StreamGraph#getJobGraph --> StreamingJobGraphGenerator#createJobGraph() --> StreamingJobGraphGenerator#createJobGraph()

主要逻辑在 createJobGraph(), 代码如下:

private JobGraph createJobGraph() {

        // make sure that all vertices start immediately
        jobGraph.setScheduleMode(ScheduleMode.EAGER);

        // Generate deterministic hashes for the nodes in order to identify them across
        // submission iff they didn't change.
        Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);

        // Generate legacy version hashes for backwards compatibility
        List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
        for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
            legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
        }

        Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes = new HashMap<>();
        //见下面详解
        setChaining(hashes, legacyHashes, chainedOperatorHashes);
        //根据setChaining得到的结果设置物理边
        setPhysicalEdges();
        //设置jobGraph的SlotSharingGroup和CoLocationGroup
        setSlotSharingAndCoLocation();
        /*
          设置jobGraph的各个 JobVertex 的checkpoint 信息
          比如说source JobVertex 需要trigger checkpoint
          所有的JobVertex需要commit和ack checkpoint
        */
        configureCheckpointing();

        JobGraphGenerator.addUserArtifactEntries(streamGraph.getEnvironment().getCachedFiles(), jobGraph);

        // set the ExecutionConfig last when it has been finalized
        try {
            jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
        }
        catch (IOException e) {
            throw new IllegalConfigurationException("Could not serialize the ExecutionConfig." +
                    "This indicates that non-serializable types (like custom serializers) were registered");
        }

        return jobGraph;
    }

代码逻辑如下:

  1. 遍历stream graph
  2. 生成operatorChain
  3. 设置物理边
  4. 设置SlotSharing Group

3.1 遍历

遍历StreamGraph 会从source开始遍历求每一个StreamNode的hash码,在计算的时候,一定会确保一个StremNode的所有输入Node都已经计算过了之后才会计算当前的StreamNode

3.2 Operator Chain

3.2.1 opearor chain 及作用

在StreamGraph中可以知道一个Operator对应一个StreamNode, 考虑一个日常经常遇到的问题,一个DataStream.map().filter() 这个关系中map和filter Operator会组成不同的StreamNode,最后生成Task, 如果这两个Task不在同一个Slot或在不同一个TaskManager中,数据会经过网络从map传到filter,执行性能会很差,考虑到这一点,flink引入 operator chain的概念, 一个operator chain 代表一组可以在同一个Slot执行的Operator串

3.2.2 什么样的情况可以chain在一起

根据源码信息,如果一个上游opeartor A与下游满足以下关系则可以串在一起

  • 下游的input只有一个即上游
  • 属于同一个SlotSharingGroup
  • 允许Chain打开
  • Partitioner 为ForwardPartitioner
  • 并行度一致
  • ChainingStrategy允许chain在起

当然一个chain可以chain多个operator,只要连续的两个operator满足以下关系

        return downStreamVertex.getInEdges().size() == 1
                && outOperator != null
                && headOperator != null
                && upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
                && outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS
                && (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||
                    headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)
                && (edge.getPartitioner() instanceof ForwardPartitioner)
                && upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
                && streamGraph.isChainingEnabled();

3.2.3 代码逻辑

现在以一个简单的StreamGraph为例来说明生成JobGraph的流程,如图:


简单StreamGraph图

上面函数中setChaining函数会调用createChain函数三次
第一次调用createChain


    private List<StreamEdge> createChain(
            Integer startNodeId,                 //Node A
            Integer currentNodeId,               //Node A
            Map<Integer, byte[]> hashes,         //空
            List<Map<Integer, byte[]>> legacyHashes, //size = 1
            int chainIndex,                          //chainIndex = 0
            Map<Integer, List<Tuple2<byte[], byte[]>>>  chainedOperatorHashes                       //空
) {

        if (!builtVertices.contains(startNodeId)) {

            List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();

            List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
            List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();
                         
            //currentNodeId为Node A, outedges为size = 1,后面的chainableOutputs size = 0, nonChainableOutputs size 为 1
            for (StreamEdge outEdge : streamGraph.getStreamNode(currentNodeId).getOutEdges()) {
                if (isChainable(outEdge, streamGraph)) {
                    chainableOutputs.add(outEdge);
                } else {
                    nonChainableOutputs.add(outEdge);
                }
            }
            //chainableOutputs为空,skip
            for (StreamEdge chainable : chainableOutputs) {
                transitiveOutEdges.addAll(
                        createChain(startNodeId, chainable.getTargetId(), hashes, legacyHashes, chainIndex + 1, chainedOperatorHashes));
            }
            //nonChainableOutputs不为空
            for (StreamEdge nonChainable : nonChainableOutputs) {
                //添加Edge D
                transitiveOutEdges.add(nonChainable);
                //见第二次调用createChain
                createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0, chainedOperatorHashes);
            }
            /*
                往chainedOperatorHashes添加 Node A的id, 由于在第二次和三次调用时已
                放入Node B的id, 此处当前的值为
                key                           value
                 NodeB Hash                          List<
                                                         Tuple2<Node C hash, Node C hash1>
                                                         Tuple2<Node B hash, Node B hash1>
                                                      >
             */
            List<Tuple2<byte[], byte[]>> operatorHashes =
                chainedOperatorHashes.computeIfAbsent(startNodeId, k -> new ArrayList<>());
           //得到Node A的hash值
            byte[] primaryHashBytes = hashes.get(currentNodeId);
           /* legacyHashes的size 为1, 实际上下面的逻为chainedOperatorHashes 的key为 Node A中添加 NodeA的添加Hash
           最后组织形式为:
                 Key                                            Value
                NodeB Hash                               List<
                                                          Tuple2<Node C hash, Node C hash1>
                                                          Tuple2<Node B hash, Node B hash1>
                                                          >
                Node A Hash                            List<<Tuple2<Node A hash, Node A hash1>
           */
            for (Map<Integer, byte[]> legacyHash : legacyHashes) {
                operatorHashes.add(new Tuple2<>(primaryHashBytes, legacyHash.get(currentNodeId)));
            }
            /* 
                  chainedNames添加Stream Node A的name
                  执行完之后值为  
                          <3, StreamNode C>
                          <2, StreamNode B -> StreamNode C>
                          <1, StreamNode A>
           */ 
            chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs));
              //后面两个基本没有用到,就直接过去了,有兴趣的读者
            chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));
            chainedPreferredResources.put(currentNodeId, createChainedPreferredResources(currentNodeId, chainableOutputs));
              /*
                    此时config会执行下面的createJobVertex, 其主要逻辑为
                    生成JobVertex,  查看JobVertex的构造函数:

                   jobVertex = new JobVertex(
                    chainedNames.get(streamNodeId),
                    jobVertexId,                //根据Node A的hash
                    legacyJobVertexIds,          // List<Node A hash1>
                    chainedOperatorVertexIds,    //值为List<Node A hash> 代表这个jobVertex是一个operator chain, 其chain中只有一个StreamNode
                    userDefinedChainedOperatorVertexIds //值为List<Node A hash1>
                  );
              */
            StreamConfig config = currentNodeId.equals(startNodeId)
                    ? createJobVertex(startNodeId, hashes, legacyHashes, chainedOperatorHashes)
                    : new StreamConfig(new Configuration());
             /*
                  设置StreamNode A的一些相关配置,如并行度、Checkpoint配置等,
                  逻辑比较简单,感兴趣的同学自行点进去看
             */
            setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs);
             //currentNodeId等于startNodeId, 执行当前代码块
            if (currentNodeId.equals(startNodeId)) {
                //知道Node A是chain 的开始 
                config.setChainStart();
                config.setChainIndex(0);
                config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
                config.setOutEdgesInOrder(transitiveOutEdges); //transitiveOutEdges是空
                /*
                  outEdges 为 edges D
                */
     
config.setOutEdges(streamGraph.getStreamNode(currentNodeId).getOutEdges());
           /*
                  transitiveOutEdges size = 1,  值为edge D, 
                  整体逻辑为给JobGraph 添加物理边连接JobVertex A(StreamNode A) 和 JobVertex B(StreamNode B和StreamNode C)
                  给物理边添加Partitioner, partitioner 作用: 根据下游的并行度、Operator类型,决定如何将当前Opeartor的结果传递给下游
                  比如说 ForwardPartitioner(上游并行度与下游并行度一致)
                        RescalePartitioner(上游并行度与下游并行度不一致)
                        其它,比如说是 Agg
                   关于这一部分更为详细的说明见Flink 并行度那一节说明
                         
           */
                for (StreamEdge edge : transitiveOutEdges) {
                    connect(startNodeId, edge);
                }

                /*
                   chainedConfigs.get(startNodeId) 的值在第二、三次调用已经计算过了
                   key                        value
                   2                           3, Node C config
                  chainedConfigs.get(startNodeId) 值为空
               */
            config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));

            } else {       
                chainedConfigs.computeIfAbsent(startNodeId, k -> new HashMap<Integer, StreamConfig>());

                config.setChainIndex(chainIndex);
                StreamNode node = streamGraph.getStreamNode(currentNodeId);
                config.setOperatorName(node.getOperatorName());
                chainedConfigs.get(startNodeId).put(currentNodeId, config);
            }

            config.setOperatorID(new OperatorID(primaryHashBytes));
                        
             // chainableOutputs.isEmpty()为true, 
            if (chainableOutputs.isEmpty()) {
                config.setChainEnd();
            }
            // transitiveOutEdges 的值为Edge D
            return transitiveOutEdges;

        } else {
            return new ArrayList<>();
        }

第二次调用createChain


    private List<StreamEdge> createChain(
            Integer startNodeId,                 //Node B
            Integer currentNodeId,               //Node B
            Map<Integer, byte[]> hashes,         //空
            List<Map<Integer, byte[]>> legacyHashes, //size = 1
            int chainIndex,                          //chainIndex = 0
            Map<Integer, List<Tuple2<byte[], byte[]>>>  chainedOperatorHashes                       //空
) {

        if (!builtVertices.contains(startNodeId)) {

            List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();

            List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
            List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();
                         
            //currentNodeId为Node B, outedges为size = 1,后面的chainableOutputs = 1, nonChainableOutputs 为空
            for (StreamEdge outEdge : streamGraph.getStreamNode(currentNodeId).getOutEdges()) {
                if (isChainable(outEdge, streamGraph)) {
                    chainableOutputs.add(outEdge);
                } else {
                    nonChainableOutputs.add(outEdge);
                }
            }
              
            for (StreamEdge chainable : chainableOutputs) {
                //见第三次调用createChain, createChain返回值一个空List
                transitiveOutEdges.addAll(
                        createChain(startNodeId, chainable.getTargetId(), hashes, legacyHashes, chainIndex + 1, chainedOperatorHashes));
            }
            //nonChainableOutputs 为空, skip
            for (StreamEdge nonChainable : nonChainableOutputs) {
                transitiveOutEdges.add(nonChainable);
                createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0, chainedOperatorHashes);
            }
            /*
                往chainedOperatorHashes添加 Node B的id, 由于在第三次调用时已
                放入Node B的id, 此当前的值为
                List<Tupe2<>> 的size = 1, 值为 Tuple2<Node C hash1, Node C hash2>         
             */
            List<Tuple2<byte[], byte[]>> operatorHashes =
                chainedOperatorHashes.computeIfAbsent(startNodeId, k -> new ArrayList<>());
           // 得到Node B的hash值
            byte[] primaryHashBytes = hashes.get(currentNodeId);
           /* legacyHashes的size 为1, 实际上下面的逻辑给chainedOperatorHashes 的key为 Node B中添加 NodeB的添加Hash
           最后组织形式为:
                 Key                                            Value
                NodeB Hash                            List<
                                                          Tuple2<Node C hash, Node C hash1>
                                                          Tuple2<Node B hash, Node B hash1>
                                                          >
           */
            for (Map<Integer, byte[]> legacyHash : legacyHashes) {
                operatorHashes.add(new Tuple2<>(primaryHashBytes, legacyHash.get(currentNodeId)));
            }
            /* 
                  chainedNames添加Stream Node C的name
                  当前值为  
                          <3, StreamNode C>
                          <2, StreamNode B -> StreamNode C>
           */ 
            chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs));
              //后面两个基本没有用到,就直接过去了,有兴趣的读者
            chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));
            chainedPreferredResources.put(currentNodeId, createChainedPreferredResources(currentNodeId, chainableOutputs));
              /*
                    此时config会执行下面的createJobVertex, 其主要逻辑为
                    生成JobVertex,  查看JobVertex的构造函数:

                   jobVertex = new JobVertex(
                    chainedNames.get(streamNodeId),
                    jobVertexId,                //根据Node B的hash
                    legacyJobVertexIds,          // List<Node B hash1>
                    chainedOperatorVertexIds,    //值为List<Node B hash, Node C hash> 代表这个jobVertex是一个operator chain, 其chain 中有两个stream opearor B和C
                    userDefinedChainedOperatorVertexIds //值为List<Node B hash1, Node C hash1>
                  );
              */
            StreamConfig config = currentNodeId.equals(startNodeId)
                    ? createJobVertex(startNodeId, hashes, legacyHashes, chainedOperatorHashes)
                    : new StreamConfig(new Configuration());
             /*
                  设置StreamNode B的一些相关配置,如并行度、Checkpoint配置等,
                  逻辑比较简单,感兴趣的同学自行点进去看
             */
            setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs);
             //currentNodeId等于startNodeId, 执行当前代码块
            if (currentNodeId.equals(startNodeId)) {
                //知道Node B是chain 的开始 
                config.setChainStart();
                config.setChainIndex(0);
                config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
                config.setOutEdgesInOrder(transitiveOutEdges); //transitiveOutEdges是空
                /*
                  outEdges 为 edges E
                */
     
config.setOutEdges(streamGraph.getStreamNode(currentNodeId).getOutEdges());

                for (StreamEdge edge : transitiveOutEdges) {
                    connect(startNodeId, edge);
                }

                /*
                   chainedConfigs.get(startNodeId) 的值在第三次调用已经计算过了
                   key                        value
                   2                           3, Node C config
  
               */
            config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));

            } else {       
                chainedConfigs.computeIfAbsent(startNodeId, k -> new HashMap<Integer, StreamConfig>());

                config.setChainIndex(chainIndex);
                StreamNode node = streamGraph.getStreamNode(currentNodeId);
                config.setOperatorName(node.getOperatorName());
                chainedConfigs.get(startNodeId).put(currentNodeId, config);
            }

            config.setOperatorID(new OperatorID(primaryHashBytes));
                        
             // chainableOutputs.isEmpty()为false, 
            if (chainableOutputs.isEmpty()) {
                config.setChainEnd();
            }
            // transitiveOutEdges 的值为空
            return transitiveOutEdges;

        } else {
            return new ArrayList<>();
        }

第三次调用createChain


    private List<StreamEdge> createChain(
            Integer startNodeId,                 //Node B
            Integer currentNodeId,               //Node C
            Map<Integer, byte[]> hashes,         //空
            List<Map<Integer, byte[]>> legacyHashes, //size = 1
            int chainIndex,                          //chainIndex = 1
            Map<Integer, List<Tuple2<byte[], byte[]>>>  chainedOperatorHashes                       //空
) {

        if (!builtVertices.contains(startNodeId)) {

            List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();

            List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
            List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();
                         
            //currentNodeId为Node C, outedges为空, 进而后面的chainableOutputs, nonChainableOutputs 都为空
            for (StreamEdge outEdge : streamGraph.getStreamNode(currentNodeId).getOutEdges()) {
                if (isChainable(outEdge, streamGraph)) {
                    chainableOutputs.add(outEdge);
                } else {
                    nonChainableOutputs.add(outEdge);
                }
            }

            for (StreamEdge chainable : chainableOutputs) {
                transitiveOutEdges.addAll(
                        createChain(startNodeId, chainable.getTargetId(), hashes, legacyHashes, chainIndex + 1, chainedOperatorHashes));
            }

            for (StreamEdge nonChainable : nonChainableOutputs) {
                transitiveOutEdges.add(nonChainable);
                createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0, chainedOperatorHashes);
            }
            //往chainedOperatorHashes 放入Node B
            List<Tuple2<byte[], byte[]>> operatorHashes =
                chainedOperatorHashes.computeIfAbsent(startNodeId, k -> new ArrayList<>());
           // 得到Node C的hash值
            byte[] primaryHashBytes = hashes.get(currentNodeId);
           /* legacyHashes的size 为1, 实际上下面的逻辑给chainedOperatorHashes 的key为 Node B中添加 NodeC的添加Hash
           其组织形式为:
                 Key                                            Value
                NodeB Hash                            List<Tuple2<Node C hash1, Node C hash2>>
           */
            for (Map<Integer, byte[]> legacyHash : legacyHashes) {
                operatorHashes.add(new Tuple2<>(primaryHashBytes, legacyHash.get(currentNodeId)));
            }
              /* 
                  chainedNames添加Stream Node C的name
                  当前值为  
                          <3, StreamNode C>
              */ 
            chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs));
              //后面两个基本没有用到,就直接过去了,有兴趣的读者
            chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));
            chainedPreferredResources.put(currentNodeId, createChainedPreferredResources(currentNodeId, chainableOutputs));
              //此时config会执行下面的new StreamConfig
            StreamConfig config = currentNodeId.equals(startNodeId)
                    ? createJobVertex(startNodeId, hashes, legacyHashes, chainedOperatorHashes)
                    : new StreamConfig(new Configuration());
             /*
                  设置StreamNode C的一些相关配置,如并行度、Checkpoint配置等,
                  逻辑比较简单,感兴趣的同学自行点进去看
             */
            setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs);
             // currentNodeId 不等于startNodeId, 执行下面那面那个代码块
            if (currentNodeId.equals(startNodeId)) {

                config.setChainStart();
                config.setChainIndex(0);
                config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
                config.setOutEdgesInOrder(transitiveOutEdges);
                config.setOutEdges(streamGraph.getStreamNode(currentNodeId).getOutEdges());

                for (StreamEdge edge : transitiveOutEdges) {
                    connect(startNodeId, edge);
                }

                config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));

            } else {
               /**
                  chainedConfigs存放的是operator chain 相关的配置。
                  chainIndex = 1
                  node = StreamNode C
                  则
                  chainedConfigs 最终内容为
                  key                        value
                  2                          3, Node C config
               */          
                chainedConfigs.computeIfAbsent(startNodeId, k -> new HashMap<Integer, StreamConfig>());

                config.setChainIndex(chainIndex);
                StreamNode node = streamGraph.getStreamNode(currentNodeId);
                config.setOperatorName(node.getOperatorName());
                chainedConfigs.get(startNodeId).put(currentNodeId, config);
            }

            config.setOperatorID(new OperatorID(primaryHashBytes));
                        
             // chainableOutputs.isEmpty()为true, 
            if (chainableOutputs.isEmpty()) {
                config.setChainEnd();
            }
            // transitiveOutEdges 的值为空
            return transitiveOutEdges;

        } else {
            return new ArrayList<>();
        }

核心逻辑为: 从当前StreamNode开始,一直遍历到结点不能与其串在一起(从代码逻辑上看,StreamNode与其本身是永远可以串在一起), 记录这些能串在一起的结点,递归翻译当前结点的输出后, 然后将保存下来可以串在一起的StreamNode生成一个JobVertex, 最后将JobVertex的输出设置成之前已经翻译的输出JobVertex。

可以发现JobGraph相对于StreamGraph的最主要区别是将一些StreamNode合并成一个JobVertex, 而JobVertex通过JobEdge(物理边)相连, 最大程度的优化了StreamGraph

最后生成的JobGraph主体架构如下


JobGraph

4. JobGraph 提交步骤

最后看一下提交Job的处理流程,最终会调用ClusterClient#submitJob(), 这个是一个抽象方法,我以MiniClusterClient为例, 会调用MiniClusterClient#requestJobResult, 关于从Client提交到JobManager过程先略过,后面会专门针对这个流程进行详解, 最终在JobManager端会调用
JobManagerRunner#grantLeadership(), 之后的调用链为verifyJobSchedulingStatusAndStartJobManager -> startJobMaster() --> JobMasterService#start() --> JobMaster#startJobExecution() --> JobMaster#resetAndStartScheduler() --> JobMaster#startScheduling()
终于看到任务开始调度执行了,关于这一块具体逻辑,后面会针对性的说明

最后编辑于
?著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,992评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,212评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事?!?“怎么了?”我有些...
    开封第一讲书人阅读 159,535评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,197评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,310评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,383评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,409评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,191评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,621评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,910评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,084评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,763评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,403评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,083评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,318评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,946评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,967评论 2 351

推荐阅读更多精彩内容