首页 > 代码库 > Flink - StreamJob

Flink - StreamJob

 

先看最简单的例子,

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Tuple2<Long, Long>> stream = env.addSource(...);stream    .map(new MapFunction<Integer, Integer>() {...})    .addSink(new SinkFunction<Tuple2<Long, Long>>() {...});env.execute();

 

DataStream

env.addSource

第一步是产生source,

    public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName, TypeInformation<OUT> typeInfo) {        if(typeInfo == null) { //如果没有指定typeInfo,做类型推断            if (function instanceof ResultTypeQueryable) {                typeInfo = ((ResultTypeQueryable<OUT>) function).getProducedType();            } else {                try {                    typeInfo = TypeExtractor.createTypeInfo(                            SourceFunction.class,                            function.getClass(), 0, null, null);                } catch (final InvalidTypesException e) {                    typeInfo = (TypeInformation<OUT>) new MissingTypeInfo(sourceName, e);                }            }        }        boolean isParallel = function instanceof ParallelSourceFunction;        clean(function);        StreamSource<OUT, ?> sourceOperator;        if (function instanceof StoppableFunction) {            sourceOperator = new StoppableStreamSource<>(cast2StoppableSourceFunction(function));        } else {            sourceOperator = new StreamSource<>(function); //将SourceFunction封装成StreamSource        }        return new DataStreamSource<>(this, typeInfo, sourceOperator, isParallel, sourceName); //将StreamSource封装成DataStreamSource    }

 

StreamSource是一种StreamOperator,核心逻辑是run,

public class StreamSource<OUT, SRC extends SourceFunction<OUT>>         extends AbstractUdfStreamOperator<OUT, SRC> implements StreamOperator<OUT> {        private transient SourceFunction.SourceContext<OUT> ctx; //用于collect output    private transient volatile boolean canceledOrStopped = false;            public StreamSource(SRC sourceFunction) {        super(sourceFunction);        this.chainingStrategy = ChainingStrategy.HEAD; //Source只能做Chaining Head    }        public void run(final Object lockingObject, final Output<StreamRecord<OUT>> collector) throws Exception {        final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic();        LatencyMarksEmitter latencyEmitter = null;  //latencyMarker的相关逻辑        if(getExecutionConfig().isLatencyTrackingEnabled()) {            latencyEmitter = new LatencyMarksEmitter<>(                getProcessingTimeService(),                collector,                getExecutionConfig().getLatencyTrackingInterval(),                getOperatorConfig().getVertexID(),                getRuntimeContext().getIndexOfThisSubtask());        }                final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();        this.ctx = StreamSourceContexts.getSourceContext(            timeCharacteristic, getProcessingTimeService(), lockingObject, collector, watermarkInterval);        try {            userFunction.run(ctx); //调用souceFunction执行用户逻辑,source应该不停的发送,该函数不会结束            // if we get here, then the user function either exited after being done (finite source)            // or the function was canceled or stopped. For the finite source case, we should emit            // a final watermark that indicates that we reached the end of event-time            if (!isCanceledOrStopped()) {                ctx.emitWatermark(Watermark.MAX_WATERMARK); //发出最大的waterMarker            }        } finally {        }    }

 

但是addSource返回的应该是DataStream,

所以将StreamSource封装成DataStreamSource

  public class DataStreamSource<T> extends SingleOutputStreamOperator<T> {    boolean isParallel;    public DataStreamSource(StreamExecutionEnvironment environment,            TypeInformation<T> outTypeInfo, StreamSource<T, ?> operator,            boolean isParallel, String sourceName) {        super(environment, new SourceTransformation<>(sourceName, operator, outTypeInfo, environment.getParallelism()));        this.isParallel = isParallel;        if (!isParallel) {            setParallelism(1);        }    }

可以认为SourceTransformation是StreamOperator的封装

  public class SingleOutputStreamOperator<T> extends DataStream<T> {    protected SingleOutputStreamOperator(StreamExecutionEnvironment environment, StreamTransformation<T> transformation) {        super(environment, transformation);    }

而DataStream是StreamTransformation的封装

SingleOutputStreamOperator,这个命名简直不可理喻,集成自DataStream,叫Operator

 

 

map操作

在DataStream中,

    public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) {        TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType(),                Utils.getCallLocationName(), true);        return transform("Map", outType, new StreamMap<>(clean(mapper)));    }

 

这里,StreamMap是StreamOperator

public class StreamMap<IN, OUT>        extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>        implements OneInputStreamOperator<IN, OUT> {    public StreamMap(MapFunction<IN, OUT> mapper) {        super(mapper);        chainingStrategy = ChainingStrategy.ALWAYS; //对于map而已,永远是可以chain的    }    @Override    public void processElement(StreamRecord<IN> element) throws Exception {        output.collect(element.replace(userFunction.map(element.getValue()))); //map的逻辑就执行mapFunc,并替换原有的element    }}

 

调用transform,

public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {        OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(                this.transformation,                operatorName,                operator,                outTypeInfo,                environment.getParallelism());        @SuppressWarnings({ "unchecked", "rawtypes" })        SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);        getExecutionEnvironment().addOperator(resultTransform);        return returnStream;    }

可以看到这里做了两层封装,从operator –> transformation –> dataStream

最后调用getExecutionEnvironment().addOperator(resultTransform);

    protected final List<StreamTransformation<?>> transformations = new ArrayList<>();    public void addOperator(StreamTransformation<?> transformation) {        Preconditions.checkNotNull(transformation, "transformation must not be null.");        this.transformations.add(transformation);    }

这个会把StreamTransformation,注册到transformations 这个结构中,后面会用到

 

sink

    public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {        // configure the type if needed        if (sinkFunction instanceof InputTypeConfigurable) {            ((InputTypeConfigurable) sinkFunction).setInputType(getType(), getExecutionConfig() );        }        StreamSink<T> sinkOperator = new StreamSink<>(clean(sinkFunction));        DataStreamSink<T> sink = new DataStreamSink<>(this, sinkOperator);        getExecutionEnvironment().addOperator(sink.getTransformation());        return sink;    }

 

StreamSink是operator,

public class StreamSink<IN> extends AbstractUdfStreamOperator<Object, SinkFunction<IN>>        implements OneInputStreamOperator<IN, Object> {    public StreamSink(SinkFunction<IN> sinkFunction) {        super(sinkFunction);        chainingStrategy = ChainingStrategy.ALWAYS; //对于sink也是永远可以chain的    }    @Override    public void processElement(StreamRecord<IN> element) throws Exception {        userFunction.invoke(element.getValue());    }    @Override    protected void reportOrForwardLatencyMarker(LatencyMarker maker) {        // all operators are tracking latencies        this.latencyGauge.reportLatency(maker, true);        // sinks don‘t forward latency markers    }}

 

而DataStreamSink不是DataStream,而是和DataStream对等的一个类,因为他的作用也是封装SinkTransformation

  public class DataStreamSink<T> {    SinkTransformation<T> transformation;    @SuppressWarnings("unchecked")    protected DataStreamSink(DataStream<T> inputStream, StreamSink<T> operator) {        this.transformation = new SinkTransformation<T>(inputStream.getTransformation(), "Unnamed", operator, inputStream.getExecutionEnvironment().getParallelism());    }

最终也是注册到执行环境,

getExecutionEnvironment().addOperator(sink.getTransformation());

 

DataStream,最终形成一个StreamTransformation的树

 

StreamGraph

下面就开始执行,

env.execute

public JobExecutionResult execute(String jobName) throws ProgramInvocationException {    StreamGraph streamGraph = getStreamGraph();    streamGraph.setJobName(jobName);    transformations.clear();    return executeRemotely(streamGraph);}

可以看到这里调用的是StreamGraphGenerator.generate

而传入的参数,就是之前的transformations,所有operator和sink都注册在里面

public StreamGraph getStreamGraph() {    if (transformations.size() <= 0) {        throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");    }    return StreamGraphGenerator.generate(this, transformations);}

 

StreamGraphGenerator

public class StreamGraphGenerator {    // The StreamGraph that is being built, this is initialized at the beginning.    private StreamGraph streamGraph;    private final StreamExecutionEnvironment env;    // Keep track of which Transforms we have already transformed, this is necessary because    // we have loops, i.e. feedback edges.    private Map<StreamTransformation<?>, Collection<Integer>> alreadyTransformed; //防止环,所以把transformed过的记下来    /**     * Private constructor. The generator should only be invoked using {@link #generate}.     */    private StreamGraphGenerator(StreamExecutionEnvironment env) {        this.streamGraph = new StreamGraph(env);        this.streamGraph.setChaining(env.isChainingEnabled());        this.streamGraph.setStateBackend(env.getStateBackend());        this.env = env;        this.alreadyTransformed = new HashMap<>();    }    /**     * Generates a {@code StreamGraph} by traversing the graph of {@code StreamTransformations}     * starting from the given transformations.     *     * @param env The {@code StreamExecutionEnvironment} that is used to set some parameters of the     *            job     * @param transformations The transformations starting from which to transform the graph     *     * @return The generated {@code StreamGraph}     */    public static StreamGraph generate(StreamExecutionEnvironment env, List<StreamTransformation<?>> transformations) {        return new StreamGraphGenerator(env).generateInternal(transformations);    }    /**     * This starts the actual transformation, beginning from the sinks.     */    private StreamGraph generateInternal(List<StreamTransformation<?>> transformations) {        for (StreamTransformation<?> transformation: transformations) {            transform(transformation);        }        return streamGraph;    }

对每个StreamTransformation调用transform逻辑,

    private Collection<Integer> transform(StreamTransformation<?> transform) {        if (alreadyTransformed.containsKey(transform)) {            return alreadyTransformed.get(transform); //如果transform过,就直接返回        }        Collection<Integer> transformedIds;        if (transform instanceof OneInputTransformation<?, ?>) {            transformedIds = transformOnInputTransform((OneInputTransformation<?, ?>) transform);        } else if (transform instanceof TwoInputTransformation<?, ?, ?>) {            transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);        } else if (transform instanceof SourceTransformation<?>) {            transformedIds = transformSource((SourceTransformation<?>) transform);        } else if (transform instanceof SinkTransformation<?>) {            transformedIds = transformSink((SinkTransformation<?>) transform);        } else if (transform instanceof UnionTransformation<?>) {            transformedIds = transformUnion((UnionTransformation<?>) transform);        } else if (transform instanceof SplitTransformation<?>) {            transformedIds = transformSplit((SplitTransformation<?>) transform);        } else if (transform instanceof SelectTransformation<?>) {            transformedIds = transformSelect((SelectTransformation<?>) transform);        } else if (transform instanceof FeedbackTransformation<?>) {            transformedIds = transformFeedback((FeedbackTransformation<?>) transform);        } else if (transform instanceof CoFeedbackTransformation<?>) {            transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);        } else if (transform instanceof PartitionTransformation<?>) {            transformedIds = transformPartition((PartitionTransformation<?>) transform);        }        return transformedIds;    }

上面有用到,OneInputTransformation,SourceTransformation,SinkTransformation

transformOnInputTransform

/**     * Transforms a {@code OneInputTransformation}.     *     * <p>     * This recusively transforms the inputs, creates a new {@code StreamNode} in the graph and     * wired the inputs to this new node.     */    private <IN, OUT> Collection<Integer> transformOnInputTransform(OneInputTransformation<IN, OUT> transform) {        Collection<Integer> inputIds = transform(transform.getInput()); //递归调用transform,所以前面source没有加到transformations,因为这里会递归到        // the recursive call might have already transformed this        if (alreadyTransformed.containsKey(transform)) {            return alreadyTransformed.get(transform); //如果已经transform过,直接返回        }        String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds); //产生slotSharingGroup        streamGraph.addOperator(transform.getId(), //addOperator                slotSharingGroup,                transform.getOperator(),                transform.getInputType(),                transform.getOutputType(),                transform.getName());        if (transform.getStateKeySelector() != null) {            TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(env.getConfig());            streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer);        }        streamGraph.setParallelism(transform.getId(), transform.getParallelism());        streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism());        for (Integer inputId: inputIds) {            streamGraph.addEdge(inputId, transform.getId(), 0); //addEdge        }        return Collections.singleton(transform.getId());    }

transform id代表什么?

public abstract class StreamTransformation<T> {    // This is used to assign a unique ID to every StreamTransformation    protected static Integer idCounter = 0;    public static int getNewNodeId() {        idCounter++;        return idCounter;    }    protected final int id;        public StreamTransformation(String name, TypeInformation<T> outputType, int parallelism) {        this.id = getNewNodeId();

可以看到这个id是从0开始自增长的值,先加后返回,所以第一个transform id为1

类static,所以取决于StreamTransformation对象创建的顺序

 

slotSharingGroup,这里只是名字,所以是string

public abstract class StreamTransformation<T> {    private String slotSharingGroup;    public StreamTransformation(String name, TypeInformation<T> outputType, int parallelism) {        this.slotSharingGroup = null;

默认下slotSharingGroup 是null,没有设置

 

在DataStreamSink, SingleOutputStreamOperator中都可以设置,

   /**     * Sets the slot sharing group of this operation. Parallel instances of     * operations that are in the same slot sharing group will be co-located in the same     * TaskManager slot, if possible.     *     * <p>Operations inherit the slot sharing group of input operations if all input operations     * are in the same slot sharing group and no slot sharing group was explicitly specified.     *     * <p>Initially an operation is in the default slot sharing group. An operation can be put into     * the default group explicitly by setting the slot sharing group to {@code "default"}.     *     * @param slotSharingGroup The slot sharing group name.     */    @PublicEvolving    public DataStreamSink<T> slotSharingGroup(String slotSharingGroup) {        transformation.setSlotSharingGroup(slotSharingGroup);        return this;    }

这是用户可以直接通过api设置的

someStream.filter(...).slotSharingGroup("group1")

 

determineSlotSharingGroup

   /**     * Determines the slot sharing group for an operation based on the slot sharing group set by     * the user and the slot sharing groups of the inputs.     *     * <p>If the user specifies a group name, this is taken as is. If nothing is specified and     * the input operations all have the same group name then this name is taken. Otherwise the     * default group is choosen.     *     * @param specifiedGroup The group specified by the user.     * @param inputIds The IDs of the input operations.     */    private String determineSlotSharingGroup(String specifiedGroup, Collection<Integer> inputIds) {        if (specifiedGroup != null) { //如果用户指定,以用户指定为准            return specifiedGroup;        } else {            String inputGroup = null;            for (int id: inputIds) { //根据输入的SlotSharingGroup进行推断                String inputGroupCandidate = streamGraph.getSlotSharingGroup(id);                if (inputGroup == null) {                    inputGroup = inputGroupCandidate; //初始化                } else if (!inputGroup.equals(inputGroupCandidate)) { //逻辑如果所有input的SlotSharingGroup都相同,就用;否则就用“default”                    return "default";                }            }            return inputGroup == null ? "default" : inputGroup; //默认用default        }    }

如果用户不指定,那么所有operator都默认在default slotSharingGroup下

如果用户指定,以用户指定为准

 

streamGraph.addOperator

    public <IN, OUT> void addOperator(            Integer vertexID,            String slotSharingGroup,            StreamOperator<OUT> operatorObject,            TypeInformation<IN> inTypeInfo,            TypeInformation<OUT> outTypeInfo,            String operatorName) {        if (operatorObject instanceof StoppableStreamSource) {            addNode(vertexID, slotSharingGroup, StoppableSourceStreamTask.class, operatorObject, operatorName);        } else if (operatorObject instanceof StreamSource) {            addNode(vertexID, slotSharingGroup, SourceStreamTask.class, operatorObject, operatorName);        } else {            addNode(vertexID, slotSharingGroup, OneInputStreamTask.class, operatorObject, operatorName);        }

Integer vertexID, 可以看到vertexId就是transform.getId()

    protected StreamNode addNode(Integer vertexID,        String slotSharingGroup,        Class<? extends AbstractInvokable> vertexClass,        StreamOperator<?> operatorObject,        String operatorName) {        if (streamNodes.containsKey(vertexID)) { //如果已经有vertexId            throw new RuntimeException("Duplicate vertexID " + vertexID);        }        StreamNode vertex = new StreamNode(environment,            vertexID,            slotSharingGroup,            operatorObject,            operatorName,            new ArrayList<OutputSelector<?>>(),            vertexClass);        streamNodes.put(vertexID, vertex);        return vertex;    }

StreamNode其实就是Transformation的封装

区别在于,不是每一个Transformation都会形成一个StreamNode

 

streamGraph.addEdge

在transformation中,通过递归的记录input transformation来表示之间的关系

这里增加edge抽象

streamGraph.addEdge(inputId, transform.getId(), 0);

    public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber) {        addEdgeInternal(upStreamVertexID,                downStreamVertexID,                typeNumber,                null,                new ArrayList<String>());    }

 

private void addEdgeInternal(Integer upStreamVertexID,            Integer downStreamVertexID,            int typeNumber,            StreamPartitioner<?> partitioner,            List<String> outputNames) {        if (virtualSelectNodes.containsKey(upStreamVertexID)) { //如果是虚拟select节点            int virtualId = upStreamVertexID;            upStreamVertexID = virtualSelectNodes.get(virtualId).f0; //由于不是真实节点,所以以虚拟节点的父节点为父节点            if (outputNames.isEmpty()) {                // selections that happen downstream override earlier selections                outputNames = virtualSelectNodes.get(virtualId).f1; //将select虚拟节点,转换为outputNames            }            addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames);//递归的调用addEdgeInternal        } else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {            int virtualId = upStreamVertexID;            upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;            if (partitioner == null) {                partitioner = virtualPartitionNodes.get(virtualId).f1; //对于partition虚拟节点,转换为partitioner            }            addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames);//递归的调用addEdgeInternal        } else {            StreamNode upstreamNode = getStreamNode(upStreamVertexID);            StreamNode downstreamNode = getStreamNode(downStreamVertexID);            // If no partitioner was specified and the parallelism of upstream and downstream            // operator matches use forward partitioning, use rebalance otherwise.            if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) { //关键逻辑,决定默认partitioner                partitioner = new ForwardPartitioner<Object>(); //如果并发度相同则是forward            } else if (partitioner == null) {                partitioner = new RebalancePartitioner<Object>(); //如果并发度不同则是Rebalance            }            if (partitioner instanceof ForwardPartitioner) { //判断如果用户指定forward,而并发度不同,抛异常                if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {                    throw new UnsupportedOperationException("Forward partitioning does not allow " +                            "change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() +                            ", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() +                            " You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");                }            }            StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner); //创建StreamEdge            getStreamNode(edge.getSourceId()).addOutEdge(edge); //将上下游StreamNode用StreamEdge相连            getStreamNode(edge.getTargetId()).addInEdge(edge);        }    }

可以看到对于select和partition这样的虚拟node,会被封装在StreamEdge中,而不会真正产生StreamNode

如下示意图,

/**  * The following graph of {@code StreamTransformations}: * * <pre>{@code * ? Source??????????????Source???????? *  ????+?????????????????     ??+??????????? *  ????|??????????????????     ?|??????????? *  ????v??????????????????     ?v??????????? *  Rebalance??????????HashPartition???? *  ????+??????????????????     ?+??????????? *  ????|?????????????????     ??|??????????? *  ????|??????????????????     ?|??????????? *  ????+------>Union<------+??????????? *  ??????????????+????????????????????? *  ??????????????|????????????????????? *  ??????????????v????????????????????? *  ????????????Split??????????????????? *  ??????????????+????????????????????? *  ??????????????|????????????????????? *  ??????????????v????????????????????? *  ????????????Select?????????????????? *  ??????????????+????????????????????? *  ??????????????v????????????????????? *  ?????????????Map???????????????????? *  ??????????????+????????????????????? *  ??????????????|????????????????????? *  ??????????????v????????????????????? *  ????????????Sink? * }</pre> * * Would result in this graph of operations at runtime: * * <pre>{@code *  Source              Source *    +                   + *    |                   | *    |                   | *    +------->Map<-------+ *              + *              | *              v *             Sink * /

 

SourceTransformation,SinkTransformation都大同小异,不详述了

看下对虚拟节点处理,

transformPartition

    private <T> Collection<Integer> transformPartition(PartitionTransformation<T> partition) {        StreamTransformation<T> input = partition.getInput();        List<Integer> resultIds = new ArrayList<>();        Collection<Integer> transformedIds = transform(input); //递归transform父节点,并得到他们的id        for (Integer transformedId: transformedIds) {            int virtualId = StreamTransformation.getNewNodeId(); //产生自己的id            streamGraph.addVirtualPartitionNode(transformedId, virtualId, partition.getPartitioner()); //只是注册到VirtualPartitionNode,而没有真正产生StreamNode            resultIds.add(virtualId);        }        return resultIds;    }

 

transformUnion

  private <T> Collection<Integer> transformUnion(UnionTransformation<T> union) {        List<StreamTransformation<T>> inputs = union.getInputs();        List<Integer> resultIds = new ArrayList<>();        for (StreamTransformation<T> input: inputs) {            resultIds.addAll(transform(input)); //递归        }        return resultIds;    }

只是简单的将inputs合并

 

JobGraph

 

env.execute

public JobExecutionResult execute(String jobName) throws ProgramInvocationException {    StreamGraph streamGraph = getStreamGraph();    streamGraph.setJobName(jobName);    transformations.clear();    return executeRemotely(streamGraph);}
继续

executeRemotely

    protected JobExecutionResult executeRemotely(StreamGraph streamGraph, List<URL> jarFiles) throws ProgramInvocationException {        ClusterClient client;        try {            return client.run(streamGraph, jarFiles, globalClasspaths, usercodeClassLoader).getJobExecutionResult();        }    }

 

ClusterClient.run

  public JobSubmissionResult run(FlinkPlan compiledPlan,            List<URL> libraries, List<URL> classpaths, ClassLoader classLoader, SavepointRestoreSettings savepointSettings)        throws ProgramInvocationException    {        JobGraph job = getJobGraph(compiledPlan, libraries, classpaths, savepointSettings);        return submitJob(job, classLoader);    }

 

    private JobGraph getJobGraph(FlinkPlan optPlan, List<URL> jarFiles, List<URL> classpaths, SavepointRestoreSettings savepointSettings) {        JobGraph job;        if (optPlan instanceof StreamingPlan) { //如果是流job plan            job = ((StreamingPlan) optPlan).getJobGraph();            job.setSavepointRestoreSettings(savepointSettings);        } else { //如果是batch            JobGraphGenerator gen = new JobGraphGenerator(this.flinkConfig);            job = gen.compileJobGraph((OptimizedPlan) optPlan);        }        for (URL jar : jarFiles) {            try {                job.addJar(new Path(jar.toURI())); //加入jar            } catch (URISyntaxException e) {                throw new RuntimeException("URL is invalid. This should not happen.", e);            }        }         job.setClasspaths(classpaths); //加上classpath        return job;    }

 

对于流的case,调用到,

((StreamingPlan) optPlan).getJobGraph();

 

StreamGraph.getJobGraph

    public JobGraph getJobGraph() {        StreamingJobGraphGenerator jobgraphGenerator = new StreamingJobGraphGenerator(this);        return jobgraphGenerator.createJobGraph();    }

 

StreamingJobGraphGenerator.createJobGraph

 

    public JobGraph createJobGraph() {        jobGraph = new JobGraph(streamGraph.getJobName()); //创建JobGraph        // make sure that all vertices start immediately        jobGraph.setScheduleMode(ScheduleMode.EAGER); //对于流所有vertices需要立即启动,相对的模式,LAZY_FROM_SOURCES,task只有在input ready时,才会创建        init(); //简单的结构new,初始化        // Generate deterministic hashes for the nodes in order to identify them across        // submission iff they didn‘t change.        Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph); //为每个node创建唯一的hashid,这样多次提交时能够定位到,最终返回node id和hash id的对应         setChaining(hashes, legacyHashes); //核心逻辑,创建JobVertex,JobEdge        setPhysicalEdges(); //只是将每个vertex的入边信息,写入该vertex所对应的StreamConfig里面        setSlotSharing();        configureCheckpointing();        // set the ExecutionConfig last when it has been finalized        jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());        return jobGraph;    }

 

setChaining

    private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes) {        for (Integer sourceNodeId : streamGraph.getSourceIDs()) {            createChain(sourceNodeId, sourceNodeId, hashes, legacyHashes, 0);        }    }

对每个source,调用createChain

    private List<StreamEdge> createChain(            Integer startNodeId,            Integer currentNodeId,            Map<Integer, byte[]> hashes,            List<Map<Integer, byte[]>> legacyHashes,            int chainIndex) {        if (!builtVertices.contains(startNodeId)) {            List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();//最终要生成JobEdge的StreamingEdge            List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();            List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();            for (StreamEdge outEdge : streamGraph.getStreamNode(currentNodeId).getOutEdges()) { //遍历当前Node的所有出边                if (isChainable(outEdge, streamGraph)) { //判断是否可以chain,核心逻辑                    chainableOutputs.add(outEdge);                } else {                    nonChainableOutputs.add(outEdge);                }            }            for (StreamEdge chainable : chainableOutputs) { //对于chainable,递归调用下去                transitiveOutEdges.addAll(                        createChain(startNodeId, chainable.getTargetId(), hashes, legacyHashes, chainIndex + 1)); //currentNodeId设为targetNode的id,同时chainIndex加1            }            for (StreamEdge nonChainable : nonChainableOutputs) { //对于nonChainable                transitiveOutEdges.add(nonChainable); //既然不是chained,就需要产生真正的JobEdge,所以放到transitiveOutEdges                createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0); //继续,但注意这里startNodeId和currentNodeId都设为TargetId,因为当前的非chained,下一个需要开始新的chain            }            chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs)); //为每个chain生成name            StreamConfig config = currentNodeId.equals(startNodeId)                    ? createJobVertex(startNodeId, hashes, legacyHashes) //只有为chain中的startNode创建JobVertex,其他的只是创建空StreamConfig                    : new StreamConfig(new Configuration());            setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs); //将StreamNode中的配置放到StreamConfig中            if (currentNodeId.equals(startNodeId)) { //如果是startNode                config.setChainStart();                config.setChainIndex(0);                config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());                config.setOutEdgesInOrder(transitiveOutEdges);                config.setOutEdges(streamGraph.getStreamNode(currentNodeId).getOutEdges());                for (StreamEdge edge : transitiveOutEdges) {                    connect(startNodeId, edge); //只要startNode需要connect edge                }                config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));            } else {                Map<Integer, StreamConfig> chainedConfs = chainedConfigs.get(startNodeId);                if (chainedConfs == null) {                    chainedConfigs.put(startNodeId, new HashMap<Integer, StreamConfig>());                }                config.setChainIndex(chainIndex);                config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());                chainedConfigs.get(startNodeId).put(currentNodeId, config);            }            if (chainableOutputs.isEmpty()) {                config.setChainEnd();            }            return transitiveOutEdges;        } else {            return new ArrayList<>();        }    }

 

isChainable

public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {        StreamNode upStreamVertex = edge.getSourceVertex(); //StreamEdge的起点        StreamNode downStreamVertex = edge.getTargetVertex(); //StreamEdge的终点        StreamOperator<?> headOperator = upStreamVertex.getOperator();        StreamOperator<?> outOperator = downStreamVertex.getOperator();        return downStreamVertex.getInEdges().size() == 1 //终点的入边为1,如果多个输入,需要等其他输入,无法chain执行                && outOperator != null                && headOperator != null                && upStreamVertex.isSameSlotSharingGroup(downStreamVertex) //在同一个SlotSharingGroup                && outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS //终点ChainingStrategy是Always                && (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||                    headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS) //启动ChainingStrategy是Head或Always                && (edge.getPartitioner() instanceof ForwardPartitioner) //Edge是ForwardPartitioner                && upStreamVertex.getParallelism() == downStreamVertex.getParallelism() //起点和终点的并发度相同                && streamGraph.isChainingEnabled(); //允许chain    }

 

createJobVertex

    private StreamConfig createJobVertex(            Integer streamNodeId,            Map<Integer, byte[]> hashes,            List<Map<Integer, byte[]>> legacyHashes) {        JobVertex jobVertex;        StreamNode streamNode = streamGraph.getStreamNode(streamNodeId);        byte[] hash = hashes.get(streamNodeId); //取出streamNode对应的唯一id        JobVertexID jobVertexId = new JobVertexID(hash); //生成JobVertexID        if (streamNode.getInputFormat() != null) {            jobVertex = new InputFormatVertex(                    chainedNames.get(streamNodeId),                    jobVertexId,                    legacyJobVertexIds);            TaskConfig taskConfig = new TaskConfig(jobVertex.getConfiguration());            taskConfig.setStubWrapper(new UserCodeObjectWrapper<Object>(streamNode.getInputFormat()));        } else {            jobVertex = new JobVertex(                    chainedNames.get(streamNodeId),                    jobVertexId,                    legacyJobVertexIds);        }        jobVertex.setInvokableClass(streamNode.getJobVertexClass());        int parallelism = streamNode.getParallelism();        if (parallelism > 0) {            jobVertex.setParallelism(parallelism); //设置并发度        } else {            parallelism = jobVertex.getParallelism();        }        jobVertex.setMaxParallelism(streamNode.getMaxParallelism());        jobVertices.put(streamNodeId, jobVertex); //将jobVertex加到相应的结构中去        builtVertices.add(streamNodeId);        jobGraph.addVertex(jobVertex);        return new StreamConfig(jobVertex.getConfiguration());    }

 

connect(startNodeId, edge)

只需要去connect transitiveOutEdges

为何叫transitive,对于一组chain node,其实只会创建HeadNode所对应的JobVertex;并且在建立链接的时候,只需要对nonchainable的边建JobEdge

上面看到,在递归调用createChain的时候会传回所有的transitiveOutEdges,因为后面chain node没有创建JobVertex,所以他们连的nonchainable的边也要放到HeadNode上,这可以理解是一种传递

    private void connect(Integer headOfChain, StreamEdge edge) {        physicalEdgesInOrder.add(edge);//connect都是物理边,即会产生JobEdge        Integer downStreamvertexID = edge.getTargetId();        JobVertex headVertex = jobVertices.get(headOfChain);        JobVertex downStreamVertex = jobVertices.get(downStreamvertexID);        StreamConfig downStreamConfig = new StreamConfig(downStreamVertex.getConfiguration());        downStreamConfig.setNumberOfInputs(downStreamConfig.getNumberOfInputs() + 1); //多一个入边,inputs + 1        StreamPartitioner<?> partitioner = edge.getPartitioner();        JobEdge jobEdge = null;        if (partitioner instanceof ForwardPartitioner) {            jobEdge = downStreamVertex.connectNewDataSetAsInput(                headVertex,                DistributionPattern.POINTWISE,                ResultPartitionType.PIPELINED); //Streaming都是pipelining,即一有结果,consumer就会来拖        } else if (partitioner instanceof RescalePartitioner){            jobEdge = downStreamVertex.connectNewDataSetAsInput(                headVertex,                DistributionPattern.POINTWISE, //produer的subtask可以对应一个或多个consumer的tasks                ResultPartitionType.PIPELINED);        } else {            jobEdge = downStreamVertex.connectNewDataSetAsInput(                    headVertex,                    DistributionPattern.ALL_TO_ALL, //producer和consumer的subtask,一对一                    ResultPartitionType.PIPELINED);        }        // set strategy name so that web interface can show it.        jobEdge.setShipStrategyName(partitioner.toString());    }

 

downStreamVertex.connectNewDataSetAsInput

JobVertex.connectNewDataSetAsInput

public JobEdge connectNewDataSetAsInput(            JobVertex input,            DistributionPattern distPattern,            ResultPartitionType partitionType) {        IntermediateDataSet dataSet = input.createAndAddResultDataSet(partitionType); //创建IntermediateDataSet,并注册到inputVertex        JobEdge edge = new JobEdge(dataSet, this, distPattern); //创建JobEdge        this.inputs.add(edge); //把edge作为当前vertex的input        dataSet.addConsumer(edge); //edge从IntermediateDataSet去数据        return edge;    }

setSlotSharing

    private void setSlotSharing() {        Map<String, SlotSharingGroup> slotSharingGroups = new HashMap<>();        for (Entry<Integer, JobVertex> entry : jobVertices.entrySet()) { //遍历每个JobVertex            String slotSharingGroup = streamGraph.getStreamNode(entry.getKey()).getSlotSharingGroup();            SlotSharingGroup group = slotSharingGroups.get(slotSharingGroup);            if (group == null) {                group = new SlotSharingGroup(); //初始化SlotSharingGroup                slotSharingGroups.put(slotSharingGroup, group);            }            entry.getValue().setSlotSharingGroup(group); //把节点加入SlotSharingGroup        }        for (Tuple2<StreamNode, StreamNode> pair : streamGraph.getIterationSourceSinkPairs()) { //对于Iteration要创建CoLocationGroup            CoLocationGroup ccg = new CoLocationGroup();            JobVertex source = jobVertices.get(pair.f0.getId());            JobVertex sink = jobVertices.get(pair.f1.getId());            ccg.addVertex(source);            ccg.addVertex(sink);            source.updateCoLocationGroup(ccg);            sink.updateCoLocationGroup(ccg);        }    }

configureCheckpointing

    private void configureCheckpointing() {        CheckpointConfig cfg = streamGraph.getCheckpointConfig();        long interval = cfg.getCheckpointInterval();        if (interval > 0) {  //只要设置过CheckpointInterval,默认设为fixedDelayRestart策略            // check if a restart strategy has been set, if not then set the FixedDelayRestartStrategy            if (streamGraph.getExecutionConfig().getRestartStrategy() == null) {                // if the user enabled checkpointing, the default number of exec retries is infinite.                streamGraph.getExecutionConfig().setRestartStrategy(                    RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, DEFAULT_RESTART_DELAY));            }        } else {            // interval of max value means disable periodic checkpoint            interval = Long.MAX_VALUE;        }        // collect the vertices that receive "trigger checkpoint" messages.        // currently, these are all the sources        List<JobVertexID> triggerVertices = new ArrayList<>();        // collect the vertices that need to acknowledge the checkpoint        // currently, these are all vertices        List<JobVertexID> ackVertices = new ArrayList<>(jobVertices.size()); //所以JobVertex都需要ack        // collect the vertices that receive "commit checkpoint" messages        // currently, these are all vertices        List<JobVertexID> commitVertices = new ArrayList<>();        for (JobVertex vertex : jobVertices.values()) {            if (vertex.isInputVertex()) { //没有输入的Vertex                triggerVertices.add(vertex.getID()); //加入triggerVertex            }            commitVertices.add(vertex.getID());            ackVertices.add(vertex.getID());        }        CheckpointingMode mode = cfg.getCheckpointingMode();        boolean isExactlyOnce;        if (mode == CheckpointingMode.EXACTLY_ONCE) { //Checkpoint模式            isExactlyOnce = true;        } else if (mode == CheckpointingMode.AT_LEAST_ONCE) {            isExactlyOnce = false;        } else {            throw new IllegalStateException("Unexpected checkpointing mode. " +                "Did not expect there to be another checkpointing mode besides " +                "exactly-once or at-least-once.");        }        JobSnapshottingSettings settings = new JobSnapshottingSettings(                triggerVertices, ackVertices, commitVertices, interval,                cfg.getCheckpointTimeout(), cfg.getMinPauseBetweenCheckpoints(),                cfg.getMaxConcurrentCheckpoints(),                externalizedCheckpointSettings,                isExactlyOnce);        jobGraph.setSnapshotSettings(settings);    }

 

至此,JobGraph已经完成

最终,将JobGraph发送到JobManager

 

参考,

http://wuchong.me/blog/2016/05/04/flink-internal-how-to-build-streamgraph/

http://wuchong.me/blog/2016/05/10/flink-internals-how-to-build-jobgraph/

Flink - StreamJob