首页 > 代码库 > Flink - Checkpoint

Flink - Checkpoint

Flink在流上最大的特点,就是引入全局snapshot,

 

CheckpointCoordinator

做snapshot的核心组件为, CheckpointCoordinator

/** * The checkpoint coordinator coordinates the distributed snapshots of operators and state. * It triggers the checkpoint by sending the messages to the relevant tasks and collects the * checkpoint acknowledgements. It also collects and maintains the overview of the state handles * reported by the tasks that acknowledge the checkpoint. * * <p>Depending on the configured {@link RecoveryMode}, the behaviour of the {@link * CompletedCheckpointStore} and {@link CheckpointIDCounter} change. The default standalone * implementations don‘t support any recovery. */public class CheckpointCoordinator {    /** Tasks who need to be sent a message when a checkpoint is started */    private final ExecutionVertex[] tasksToTrigger; //需要触发checkpoint的tasks    /** Tasks who need to acknowledge a checkpoint before it succeeds */    private final ExecutionVertex[] tasksToWaitFor;    /** Tasks who need to be sent a message when a checkpoint is confirmed */    private final ExecutionVertex[] tasksToCommitTo;    /** Map from checkpoint ID to the pending checkpoint */    private final Map<Long, PendingCheckpoint> pendingCheckpoints;    /** Completed checkpoints. Implementations can be blocking. Make sure calls to methods     * accessing this don‘t block the job manager actor and run asynchronously. */    private final CompletedCheckpointStore completedCheckpointStore;  //用于记录已经完成的checkpoints    /** A list of recent checkpoint IDs, to identify late messages (vs invalid ones) */    private final ArrayDeque<Long> recentPendingCheckpoints;    /** Checkpoint ID counter to ensure ascending IDs. In case of job manager failures, these     * need to be ascending across job managers. */    protected final CheckpointIDCounter checkpointIdCounter; //保证产生递增的checkpoint id,即使当jobmanager crash,也有保证全局checkpoint id是递增的    /** The base checkpoint interval. Actual trigger time may be affected by the     * max concurrent checkpoints and minimum-pause values */    private final long baseInterval;  //触发checkpoint的时间间隔    /** The max time (in ms) that a checkpoint may take */    private final long checkpointTimeout; //一次checkpoint消耗的最大时间,超过,我们就可以认为该checkpoint超时失败    /** The min time(in ms) to delay after a checkpoint could be triggered. Allows to     * enforce minimum processing time between checkpoint attempts */    private final long minPauseBetweenCheckpoints; //checkpoint之间的最小间隔    /** The maximum number of checkpoints that may be in progress at the same time */    private final int maxConcurrentCheckpointAttempts; //最多同时存在多少checkpoint    /** Actor that receives status updates from the execution graph this coordinator works for */    private ActorGateway jobStatusListener;    /** The number of consecutive failed trigger attempts */    private int numUnsuccessfulCheckpointsTriggers;    private ScheduledTrigger currentPeriodicTrigger;    /** Flag whether a triggered checkpoint should immediately schedule the next checkpoint.     * Non-volatile, because only accessed in synchronized scope */    private boolean periodicScheduling;    /** Flag whether a trigger request could not be handled immediately. Non-volatile, because only     * accessed in synchronized scope */    private boolean triggerRequestQueued;    /** Flag marking the coordinator as shut down (not accepting any messages any more) */    private volatile boolean shutdown; //注意是volatile,保证可见性    /** Shutdown hook thread to clean up state handles. */    private final Thread shutdownHook;    /** Helper for tracking checkpoint statistics  */    private final CheckpointStatsTracker statsTracker;    public CheckpointCoordinator(            JobID job,            long baseInterval,            long checkpointTimeout,            long minPauseBetweenCheckpoints,            int maxConcurrentCheckpointAttempts,            ExecutionVertex[] tasksToTrigger,            ExecutionVertex[] tasksToWaitFor,            ExecutionVertex[] tasksToCommitTo,            ClassLoader userClassLoader,            CheckpointIDCounter checkpointIDCounter,            CompletedCheckpointStore completedCheckpointStore,            RecoveryMode recoveryMode,            CheckpointStatsTracker statsTracker) throws Exception {        checkpointIDCounter.start(); //开启CheckpointIDCounter        this.timer = new Timer("Checkpoint Timer", true);        this.statsTracker = checkNotNull(statsTracker);        if (recoveryMode == RecoveryMode.STANDALONE) { // 如果是standalone模式,需要加上shutdownHook来清理state            // Add shutdown hook to clean up state handles when no checkpoint recovery is            // possible. In case of another configured recovery mode, the checkpoints need to be            // available for the standby job managers.            this.shutdownHook = new Thread(new Runnable() {                @Override                public void run() {                    try {                        CheckpointCoordinator.this.shutdown(); //显示的调用shutdown                    }                    catch (Throwable t) {                        LOG.error("Error during shutdown of checkpoint coordinator via " +                                "JVM shutdown hook: " + t.getMessage(), t);                    }                }            });            try {                // Add JVM shutdown hook to call shutdown of service                Runtime.getRuntime().addShutdownHook(shutdownHook);            }            catch (IllegalStateException ignored) {                // JVM is already shutting down. No need to do anything.            }            catch (Throwable t) {                LOG.error("Cannot register checkpoint coordinator shutdown hook.", t);            }        }        else {            this.shutdownHook = null;        }    }

 

CheckpointIDCounter

有两种,

StandaloneCheckpointIDCounter

这种case下的,counter,只是用AtomicLong来是实现的,那JobManager如果挂了,那这个值可能是丢了的,重启后,应该是无法保证递增的

但这里说,在standalone的情况下,不需要做recovery,所以这个是可以接受的

/** * {@link CheckpointIDCounter} instances for JobManagers running in {@link RecoveryMode#STANDALONE}. * * <p>Simple wrapper of an {@link AtomicLong}. This is sufficient, because job managers are not * recoverable in this recovery mode. */public class StandaloneCheckpointIDCounter implements CheckpointIDCounter {    private final AtomicLong checkpointIdCounter = new AtomicLong(1);    @Override    public void start() throws Exception {    }    @Override    public void stop() throws Exception {    }    @Override    public long getAndIncrement() throws Exception {        return checkpointIdCounter.getAndIncrement();    }    @Override    public void setCount(long newCount) {        checkpointIdCounter.set(newCount);    }}

 

ZooKeeperCheckpointIDCounter

这种counter用zk的persistent node来保存当前的计数,以保证计数的递增

/** * {@link CheckpointIDCounter} instances for JobManagers running in {@link RecoveryMode#ZOOKEEPER}. * * <p>Each counter creates a ZNode: * <pre> * +----O /flink/checkpoint-counter/&lt;job-id&gt; 1 [persistent] * . * . * . * +----O /flink/checkpoint-counter/&lt;job-id&gt; N [persistent] * </pre> * * <p>The checkpoints IDs are required to be ascending (per job). In order to guarantee this in case * of job manager failures we use ZooKeeper to have a shared counter across job manager instances. */public class ZooKeeperCheckpointIDCounter implements CheckpointIDCounter

 

CompletedCheckpointStore

接口,用于记录有哪些已经完成的checkpoint

/** * A bounded LIFO-queue of {@link CompletedCheckpoint} instances. */public interface CompletedCheckpointStore {    /**     * Recover available {@link CompletedCheckpoint} instances.     *     * <p>After a call to this method, {@link #getLatestCheckpoint()} returns the latest     * available checkpoint.     */    void recover() throws Exception;    /**     * Adds a {@link CompletedCheckpoint} instance to the list of completed checkpoints.     *     * <p>Only a bounded number of checkpoints is kept. When exceeding the maximum number of     * retained checkpoints, the oldest one will be discarded via {@link     * CompletedCheckpoint#discard(ClassLoader)}.     */    void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception;    /**     * Returns the latest {@link CompletedCheckpoint} instance or <code>null</code> if none was     * added.     */    CompletedCheckpoint getLatestCheckpoint() throws Exception;    /**     * Discards all added {@link CompletedCheckpoint} instances via {@link     * CompletedCheckpoint#discard(ClassLoader)}.     */    void discardAllCheckpoints() throws Exception;    /**     * Returns all {@link CompletedCheckpoint} instances.     *     * <p>Returns an empty list if no checkpoint has been added yet.     */    List<CompletedCheckpoint> getAllCheckpoints() throws Exception;    /**     * Returns the current number of retained checkpoints.     */    int getNumberOfRetainedCheckpoints();}

 

看下StandaloneCompletedCheckpointStore,其实就是一个用于记录CompletedCheckpoint的ArrayDeque

class StandaloneCompletedCheckpointStore implements CompletedCheckpointStore {    /** The completed checkpoints. */    private final ArrayDeque<CompletedCheckpoint> checkpoints;}

ZooKeeperCompletedCheckpointStore,这个就是用zk来记录

/** * {@link CompletedCheckpointStore} for JobManagers running in {@link RecoveryMode#ZOOKEEPER}. * * <p>Checkpoints are added under a ZNode per job: * <pre> * +----O /flink/checkpoints/&lt;job-id&gt;  [persistent] * .    | * .    +----O /flink/checkpoints/&lt;job-id&gt;/1 [persistent] * .    .                                  . * .    .                                  . * .    .                                  . * .    +----O /flink/checkpoints/&lt;job-id&gt;/N [persistent] * </pre> * * <p>During recovery, the latest checkpoint is read from ZooKeeper. If there is more than one, * only the latest one is used and older ones are discarded (even if the maximum number * of retained checkpoints is greater than one). * * <p>If there is a network partition and multiple JobManagers run concurrent checkpoints for the * same program, it is OK to take any valid successful checkpoint as long as the "history" of * checkpoints is consistent. Currently, after recovery we start out with only a single * checkpoint to circumvent those situations. */public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointStore {

 

 

做snapshot流程

StreamingJobGraphGenerator

配置checkpoint
private void configureCheckpointing() {    CheckpointConfig cfg = streamGraph.getCheckpointConfig(); //取出Checkpoint的配置        if (cfg.isCheckpointingEnabled()) {        long interval = cfg.getCheckpointInterval(); //Checkpoint的时间间隔        // collect the vertices that receive "trigger checkpoint" messages.        // currently, these are all the sources        List<JobVertexID> triggerVertices = new ArrayList<JobVertexID>();        // collect the vertices that need to acknowledge the checkpoint        // currently, these are all vertices        List<JobVertexID> ackVertices = new ArrayList<JobVertexID>(jobVertices.size());        // collect the vertices that receive "commit checkpoint" messages        // currently, these are all vertices        List<JobVertexID> commitVertices = new ArrayList<JobVertexID>();                for (JobVertex vertex : jobVertices.values()) {            if (vertex.isInputVertex()) {  //只有对source vertex,才加入triggerVertices,因为只需要在源头触发checkpoint                triggerVertices.add(vertex.getID());            }            // TODO: add check whether the user function implements the checkpointing interface            commitVertices.add(vertex.getID()); //当前所有节点都会加入commitVertices和ackVertices            ackVertices.add(vertex.getID());        }        JobSnapshottingSettings settings = new JobSnapshottingSettings( //生成JobSnapshottingSettings                triggerVertices, ackVertices, commitVertices, interval,                cfg.getCheckpointTimeout(), cfg.getMinPauseBetweenCheckpoints(),                cfg.getMaxConcurrentCheckpoints());        jobGraph.setSnapshotSettings(settings); //调用setSnapshotSettings        // if the user enabled checkpointing, the default number of exec retries is infinitive.        int executionRetries = streamGraph.getExecutionConfig().getNumberOfExecutionRetries();        if(executionRetries == -1) {            streamGraph.getExecutionConfig().setNumberOfExecutionRetries(Integer.MAX_VALUE);        }    }}

 

JobManager

submitJob的时候,将JobGraph中的配置,放到ExecutionGraph中去

private def submitJob(jobGraph: JobGraph, jobInfo: JobInfo, isRecovery: Boolean = false): Unit = {    // configure the state checkpointing    val snapshotSettings = jobGraph.getSnapshotSettings    if (snapshotSettings != null) {        val jobId = jobGraph.getJobID()                val idToVertex: JobVertexID => ExecutionJobVertex = id => {        val vertex = executionGraph.getJobVertex(id)        if (vertex == null) {          throw new JobSubmissionException(jobId,            "The snapshot checkpointing settings refer to non-existent vertex " + id)        }        vertex    }        val triggerVertices: java.util.List[ExecutionJobVertex] =        snapshotSettings.getVerticesToTrigger().asScala.map(idToVertex).asJava        val ackVertices: java.util.List[ExecutionJobVertex] =        snapshotSettings.getVerticesToAcknowledge().asScala.map(idToVertex).asJava        val confirmVertices: java.util.List[ExecutionJobVertex] =        snapshotSettings.getVerticesToConfirm().asScala.map(idToVertex).asJava        val completedCheckpoints = checkpointRecoveryFactory        .createCompletedCheckpoints(jobId, userCodeLoader)        val checkpointIdCounter = checkpointRecoveryFactory.createCheckpointIDCounter(jobId)        executionGraph.enableSnapshotCheckpointing(        snapshotSettings.getCheckpointInterval,        snapshotSettings.getCheckpointTimeout,        snapshotSettings.getMinPauseBetweenCheckpoints,        snapshotSettings.getMaxConcurrentCheckpoints,        triggerVertices,        ackVertices,        confirmVertices,        context.system,        leaderSessionID.orNull,        checkpointIdCounter,        completedCheckpoints,        recoveryMode,        savepointStore)    }}

 

ExecutionGraph

创建checkpointCoordinator对象

public void enableSnapshotCheckpointing(        long interval,        long checkpointTimeout,        long minPauseBetweenCheckpoints,        int maxConcurrentCheckpoints,        List<ExecutionJobVertex> verticesToTrigger,        List<ExecutionJobVertex> verticesToWaitFor,        List<ExecutionJobVertex> verticesToCommitTo,        ActorSystem actorSystem,        UUID leaderSessionID,        CheckpointIDCounter checkpointIDCounter,        CompletedCheckpointStore completedCheckpointStore,        RecoveryMode recoveryMode,        StateStore<Savepoint> savepointStore) throws Exception {    ExecutionVertex[] tasksToTrigger = collectExecutionVertices(verticesToTrigger);    ExecutionVertex[] tasksToWaitFor = collectExecutionVertices(verticesToWaitFor);    ExecutionVertex[] tasksToCommitTo = collectExecutionVertices(verticesToCommitTo);        // disable to make sure existing checkpoint coordinators are cleared    disableSnaphotCheckpointing();    if (isStatsDisabled) {        checkpointStatsTracker = new DisabledCheckpointStatsTracker();    }    else {        int historySize = jobConfiguration.getInteger(                ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE,                ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE);        checkpointStatsTracker = new SimpleCheckpointStatsTracker(historySize, tasksToWaitFor);    }    // create the coordinator that triggers and commits checkpoints and holds the state    checkpointCoordinator = new CheckpointCoordinator(            jobID,            interval,            checkpointTimeout,            minPauseBetweenCheckpoints,            maxConcurrentCheckpoints,            tasksToTrigger,            tasksToWaitFor,            tasksToCommitTo,            userClassLoader,            checkpointIDCounter,            completedCheckpointStore,            recoveryMode,            checkpointStatsTracker);        // the periodic checkpoint scheduler is activated and deactivated as a result of    // job status changes (running -> on, all other states -> off)    registerJobStatusListener( //将checkpointCoordinator的actor注册到jobStatusListenerActors,这样当job状态变化时,可以通知checkpointCoordinator            checkpointCoordinator.createActivatorDeactivator(actorSystem, leaderSessionID));

这里看到checkpointCoordinator 作为ExecutionGraph的成员,

接着会异步的提交ExecutionGraph,

// execute the recovery/writing the jobGraph into the SubmittedJobGraphStore asynchronously// because it is a blocking operationfuture {    try {      if (isRecovery) {        executionGraph.restoreLatestCheckpointedState() //恢复CheckpointedState      }      else {        //......       }        submittedJobGraphs.putJobGraph(new SubmittedJobGraph(jobGraph, jobInfo)) //把jobGraph放到submittedJobGraphs中track      }          jobInfo.client ! decorateMessage(JobSubmitSuccess(jobGraph.getJobID)) //告诉client,job提交成功          if (leaderElectionService.hasLeadership) {        executionGraph.scheduleForExecution(scheduler) //真正的调度executionGraph      } else {        //......      }    } catch {      //.......    }}(context.dispatcher)

 

CheckpointCoordinatorDeActivator

/** * This actor listens to changes in the JobStatus and activates or deactivates the periodic * checkpoint scheduler. */public class CheckpointCoordinatorDeActivator extends FlinkUntypedActor {    private final CheckpointCoordinator coordinator;    private final UUID leaderSessionID;    @Override    public void handleMessage(Object message) {        if (message instanceof ExecutionGraphMessages.JobStatusChanged) {            JobStatus status = ((ExecutionGraphMessages.JobStatusChanged) message).newJobStatus();                        if (status == JobStatus.RUNNING) {                // start the checkpoint scheduler                coordinator.startCheckpointScheduler();            } else {                // anything else should stop the trigger for now                coordinator.stopCheckpointScheduler();            }        }                // we ignore all other messages    }    @Override    public UUID getLeaderSessionID() {        return leaderSessionID;    }}

在job状态发生变化时,需要打开或关闭Checkpoint scheduler

 

CheckpointCoordinator

开启定时startCheckpointScheduler

public void startCheckpointScheduler() {    synchronized (lock) {        // make sure all prior timers are cancelled        stopCheckpointScheduler();        periodicScheduling = true;        currentPeriodicTrigger = new ScheduledTrigger();        timer.scheduleAtFixedRate(currentPeriodicTrigger, baseInterval, baseInterval);    }}private class ScheduledTrigger extends TimerTask {    @Override    public void run() {        try {            triggerCheckpoint(System.currentTimeMillis());        }        catch (Exception e) {            LOG.error("Exception while triggering checkpoint", e);        }    }}

 

triggerCheckpoint,用于触发一次checkpoint

/** * Triggers a new checkpoint and uses the given timestamp as the checkpoint * timestamp. * * @param timestamp The timestamp for the checkpoint. * @param nextCheckpointId The checkpoint ID to use for this checkpoint or <code>-1</code> if *                         the checkpoint ID counter should be queried. */public boolean triggerCheckpoint(long timestamp, long nextCheckpointId) throws Exception {    // we will actually trigger this checkpoint!    final long checkpointID;    if (nextCheckpointId < 0) {        try {            // this must happen outside the locked scope, because it communicates            // with external services (in HA mode) and may block for a while.            checkpointID = checkpointIdCounter.getAndIncrement();        }        catch (Throwable t) {        }    }    else {        checkpointID = nextCheckpointId;    }    //对于没有开始的Checkpoint,称为PendingCheckpoint,传入所有需要ack checkpoint的ackTasks    //后续会一个个ack这些tasks,当所有的ackTasks都被acked,PendingCheckpoint就变成CompletedCheckpoint    final PendingCheckpoint checkpoint = new PendingCheckpoint(job, checkpointID, timestamp, ackTasks);    // schedule the timer that will clean up the expired checkpoints,定期去清理过期的checkpoint    TimerTask canceller = new TimerTask() {        @Override        public void run() {            try {                synchronized (lock) {                    // only do the work if the checkpoint is not discarded anyways                    // note that checkpoint completion discards the pending checkpoint object                    if (!checkpoint.isDiscarded()) {                        LOG.info("Checkpoint " + checkpointID + " expired before completing.");                        checkpoint.discard(userClassLoader);                        pendingCheckpoints.remove(checkpointID);                        rememberRecentCheckpointId(checkpointID);                        onCancelCheckpoint(checkpointID);                        triggerQueuedRequests();                    }                }            }            catch (Throwable t) {                LOG.error("Exception while handling checkpoint timeout", t);            }        }    };    try {        // re-acquire the lock        synchronized (lock) {            pendingCheckpoints.put(checkpointID, checkpoint); //将该PendingCheckpoint加入列表track            timer.schedule(canceller, checkpointTimeout);  //并且启动canceller        }        // end of lock scope        // send the messages to the tasks that trigger their checkpoint        for (int i = 0; i < tasksToTrigger.length; i++) {            ExecutionAttemptID id = triggerIDs[i];            TriggerCheckpoint message = new TriggerCheckpoint(job, id, checkpointID, timestamp);            tasksToTrigger[i].sendMessageToCurrentExecution(message, id); //给所有的需要触发checkpoint的task发送checkpoint message,这里只是source tasks        }        numUnsuccessfulCheckpointsTriggers = 0;        return true;    }    catch (Throwable t) {    }}

---------上面只会给所有的source发checkpoint message,所以下面的流程只有source会走到-----------

 

TaskManager

sendMessageToCurrentExecution,发送的message最终会被TaskManager收到,

/**   * Handler for messages related to checkpoints.   *   * @param actorMessage The checkpoint message.   */  private def handleCheckpointingMessage(actorMessage: AbstractCheckpointMessage): Unit = {    actorMessage match {      case message: TriggerCheckpoint =>  //如果是triggerCheckpoint        val taskExecutionId = message.getTaskExecutionId        val checkpointId = message.getCheckpointId        val timestamp = message.getTimestamp        val task = runningTasks.get(taskExecutionId) //从runningTasks中取出真正执行的task        if (task != null) {          task.triggerCheckpointBarrier(checkpointId, timestamp) //最终是调用task的triggerCheckpointBarrier        }      case message: NotifyCheckpointComplete =>        val taskExecutionId = message.getTaskExecutionId        val checkpointId = message.getCheckpointId        val timestamp = message.getTimestamp        val task = runningTasks.get(taskExecutionId)        if (task != null) {          task.notifyCheckpointComplete(checkpointId) //调用task的notifyCheckpointComplete        } else {          log.debug(            s"TaskManager received a checkpoint confirmation for unknown task $taskExecutionId.")        }      // unknown checkpoint message      case _ => unhandled(actorMessage)    }  }

 

Task

public void triggerCheckpointBarrier(final long checkpointID, final long checkpointTimestamp) {    AbstractInvokable invokable = this.invokable;    if (executionState == ExecutionState.RUNNING && invokable != null) {        if (invokable instanceof StatefulTask) {            // build a local closure             final StatefulTask<?> statefulTask = (StatefulTask<?>) invokable;            final String taskName = taskNameWithSubtask;            Runnable runnable = new Runnable() {                @Override                public void run() {                    try {                        statefulTask.triggerCheckpoint(checkpointID, checkpointTimestamp); //关键就是调用statefulTask的triggerCheckpoint,这个时候task正在执行,所以checkpoint是并行做的                    }                    catch (Throwable t) {                        failExternally(new RuntimeException("Error while triggering checkpoint for " + taskName, t));                    }                }            };            executeAsyncCallRunnable(runnable, "Checkpoint Trigger for " + taskName);        }    }}

 

StreamTask

StreamTask就是实现了StatefulTask

所以最终调用到,

StreamTask.triggerCheckpoint,这里面会实际去做checkpoint工作
for (int i = 0; i < states.length; i++) {    StreamOperator<?> operator = allOperators[i];    if (operator != null) {        StreamTaskState state = operator.snapshotOperatorState(checkpointId, timestamp);        states[i] = state.isEmpty() ? null : state;    }}StreamTaskStateList allStates = new StreamTaskStateList(states);getEnvironment().acknowledgeCheckpoint(checkpointId, allStates);
这里是对于source而言的checkpoint的调用逻辑,对于中间节点或sink,是要根据barrier情况,通过onEvent来触发triggerCheckpoint的

 

StreamTask.triggerCheckpoint最关键的步骤是,会对task中每个operator完成state snapshot
最终生成StreamTaskStateList allStates,保存所有的state的list

最终同步或异步的调用

getEnvironment().acknowledgeCheckpoint(checkpointId, allStates);

把state snapshot发送到Jobmanager去,后面就看看JobManager怎么处理的

 

RuntimeEnvironment

package org.apache.flink.runtime.taskmanager;
/** * In implementation of the {@link Environment}. */public class RuntimeEnvironment implements Environment {    @Override    public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state) {        // try and create a serialized version of the state handle        SerializedValue<StateHandle<?>> serializedState;        long stateSize;        if (state == null) {            serializedState = null;            stateSize = 0;        } else {            try {                serializedState = new SerializedValue<StateHandle<?>>(state);            } catch (Exception e) {                throw new RuntimeException("Failed to serialize state handle during checkpoint confirmation", e);            }            try {                stateSize = state.getStateSize();            }            catch (Exception e) {                throw new RuntimeException("Failed to fetch state handle size", e);            }        }                AcknowledgeCheckpoint message = new AcknowledgeCheckpoint(                jobId,                executionId,                checkpointId,                serializedState,                stateSize);        jobManager.tell(message);    }}

所以可以看到,是把这个ack发送到job manager的,

 

JobManager

handleCheckpointMessage

/*** Dedicated handler for checkpoint messages.** @param actorMessage The checkpoint actor message.*/private def handleCheckpointMessage(actorMessage: AbstractCheckpointMessage): Unit = {actorMessage match {  case ackMessage: AcknowledgeCheckpoint =>    val jid = ackMessage.getJob()    currentJobs.get(jid) match {      case Some((graph, _)) =>        val checkpointCoordinator = graph.getCheckpointCoordinator()        val savepointCoordinator = graph.getSavepointCoordinator()        if (checkpointCoordinator != null && savepointCoordinator != null) {          future {  //future等待异步的ack消息            try {              if (checkpointCoordinator.receiveAcknowledgeMessage(ackMessage)) { //JobManager收到checkpoint的ack message                // OK, this is the common case              }              else {                // Try the savepoint coordinator if the message was not addressed                // to the periodic checkpoint coordinator.                if (!savepointCoordinator.receiveAcknowledgeMessage(ackMessage)) {                  log.info("Received message for non-existing checkpoint " +                    ackMessage.getCheckpointId)                }              }            }            catch {              case t: Throwable =>                log.error(s"Error in CheckpointCoordinator while processing $ackMessage", t)            }          }(context.dispatcher)        }

 

CheckpointCoordinator

receiveAcknowledgeMessage

/** * Receives an AcknowledgeCheckpoint message and returns whether the * message was associated with a pending checkpoint. * * @param message Checkpoint ack from the task manager * * @return Flag indicating whether the ack‘d checkpoint was associated * with a pending checkpoint. * * @throws Exception If the checkpoint cannot be added to the completed checkpoint store. */public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws Exception {    final long checkpointId = message.getCheckpointId();    CompletedCheckpoint completed = null;    PendingCheckpoint checkpoint;    // Flag indicating whether the ack message was for a known pending    // checkpoint.    boolean isPendingCheckpoint;    synchronized (lock) {        checkpoint = pendingCheckpoints.get(checkpointId); //取出相应的pendingCheckpoint        if (checkpoint != null && !checkpoint.isDiscarded()) {            isPendingCheckpoint = true;            if (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getState(), message.getStateSize())) { //根据这个ack message,对pendingCheckpoint进行ack                if (checkpoint.isFullyAcknowledged()) { //如果所有需要ack的tasks都完成ack                    completed = checkpoint.toCompletedCheckpoint(); //将状态置为Completed                    completedCheckpointStore.addCheckpoint(completed); //将checkpoint track到completedCheckpointStore,表示完成一次完整的checkpoint                    pendingCheckpoints.remove(checkpointId); //从pending里面去除相应的checkpoint                    rememberRecentCheckpointId(checkpointId);                    dropSubsumedCheckpoints(completed.getTimestamp());                    onFullyAcknowledgedCheckpoint(completed);                    triggerQueuedRequests();                }            }        }    }    // send the confirmation messages to the necessary targets. we do this here    // to be outside the lock scope    if (completed != null) {        final long timestamp = completed.getTimestamp();        for (ExecutionVertex ev : tasksToCommitTo) {            Execution ee = ev.getCurrentExecutionAttempt();            if (ee != null) {                ExecutionAttemptID attemptId = ee.getAttemptId();                NotifyCheckpointComplete notifyMessage = new NotifyCheckpointComplete(job, attemptId, checkpointId, timestamp);                ev.sendMessageToCurrentExecution(notifyMessage, ee.getAttemptId()); //通知每个ExecutionVertex,checkpoint完成            }        }        statsTracker.onCompletedCheckpoint(completed);    }    return isPendingCheckpoint;}

 

PendingCheckpoint

在acknowledgeTask中,

只是把state,cache在collectedStates中,

public boolean acknowledgeTask(        ExecutionAttemptID attemptID,        SerializedValue<StateHandle<?>> state,        long stateSize) {    synchronized (lock) {        if (discarded) {            return false;        }                ExecutionVertex vertex = notYetAcknowledgedTasks.remove(attemptID);        if (vertex != null) {            if (state != null) {                collectedStates.add(new StateForTask(                        state,                        stateSize,                        vertex.getJobvertexId(),                        vertex.getParallelSubtaskIndex(),                        System.currentTimeMillis() - checkpointTimestamp));            }            numAcknowledgedTasks++;            return true;        }        else {            return false;        }    }}

 

接着在收到所有的task的ack后,会调用toCompletedCheckpoint

public CompletedCheckpoint toCompletedCheckpoint() {    synchronized (lock) {        if (discarded) {            throw new IllegalStateException("pending checkpoint is discarded");        }        if (notYetAcknowledgedTasks.isEmpty()) {            CompletedCheckpoint completed =  new CompletedCheckpoint(jobId, checkpointId,                    checkpointTimestamp, System.currentTimeMillis(), new ArrayList<StateForTask>(collectedStates));            dispose(null, false);                        return completed;        }        else {            throw new IllegalStateException("Cannot complete checkpoint while not all tasks are acknowledged");        }    }}

把collectedStates封装在CompletedCheckpoint中,返回

 

最后调用completedCheckpointStore.addCheckpoint,存储这个checkpoint,可以参考

ZooKeeperCompletedCheckpointStore

 

NotifyCheckpointComplete

通用这个NotifyCheckpointComplete,也最到TaskManager,Task,最终调到StreamTask.notifyCheckpointComplete

@Overridepublic void notifyCheckpointComplete(long checkpointId) throws Exception {    synchronized (lock) {        if (isRunning) {            LOG.debug("Notification of complete checkpoint for task {}", getName());                        // We first notify the state backend if necessary            if (stateBackend instanceof CheckpointNotifier) {                ((CheckpointNotifier) stateBackend).notifyCheckpointComplete(checkpointId);            }                        for (StreamOperator<?> operator : operatorChain.getAllOperators()) {                if (operator != null) {                    operator.notifyOfCompletedCheckpoint(checkpointId);                }            }        }        else {            LOG.debug("Ignoring notification of complete checkpoint for not-running task {}", getName());        }    }}

这个就是checkpoint的完整的过程

 

再看看restore的过程

 

Restore过程

可以看到,在提交job的时候,会调用

executionGraph.restoreLatestCheckpointedState()

/** * Restores the latest checkpointed state. * * <p>The recovery of checkpoints might block. Make sure that calls to this method don‘t * block the job manager actor and run asynchronously. *  */public void restoreLatestCheckpointedState() throws Exception {    synchronized (progressLock) {        if (checkpointCoordinator != null) {            checkpointCoordinator.restoreLatestCheckpointedState(getAllVertices(), false, false);        }    }}

 

restoreLatestCheckpointedState

public void restoreLatestCheckpointedState(        Map<JobVertexID, ExecutionJobVertex> tasks,        boolean errorIfNoCheckpoint,        boolean allOrNothingState) throws Exception {    synchronized (lock) {        // Recover the checkpoints        //对于ZooKeeperCompletedCheckpointStore,        //Gets the latest checkpoint from ZooKeeper and removes all others.        completedCheckpointStore.recover();        // restore from the latest checkpoint        CompletedCheckpoint latest = completedCheckpointStore.getLatestCheckpoint(); //从completedCheckpointStore中取出最新的CompletedCheckpoint        long recoveryTimestamp = System.currentTimeMillis();        if (allOrNothingState) { //全部成功或Nothing            Map<ExecutionJobVertex, Integer> stateCounts = new HashMap<ExecutionJobVertex, Integer>();            for (StateForTask state : latest.getStates()) {                ExecutionJobVertex vertex = tasks.get(state.getOperatorId());                Execution exec = vertex.getTaskVertices()[state.getSubtask()].getCurrentExecutionAttempt();                exec.setInitialState(state.getState(), recoveryTimestamp); //恢复state                Integer count = stateCounts.get(vertex); //计数                if (count != null) {                    stateCounts.put(vertex, count+1);                } else {                    stateCounts.put(vertex, 1);                }            }            // validate that either all task vertices have state, or none            for (Map.Entry<ExecutionJobVertex, Integer> entry : stateCounts.entrySet()) {                ExecutionJobVertex vertex = entry.getKey();                if (entry.getValue() != vertex.getParallelism()) { //如果vetex的恢复state次数不等于平行数,说明有些没有被恢复,抛异常                    throw new IllegalStateException(                            "The checkpoint contained state only for a subset of tasks for vertex " + vertex);                }            }        }        else {            for (StateForTask state : latest.getStates()) {                ExecutionJobVertex vertex = tasks.get(state.getOperatorId());                Execution exec = vertex.getTaskVertices()[state.getSubtask()].getCurrentExecutionAttempt();                exec.setInitialState(state.getState(), recoveryTimestamp);            }        }    }}

 

Execution

public void setInitialState(SerializedValue<StateHandle<?>> initialState, long recoveryTimestamp) {    if (state != ExecutionState.CREATED) {        throw new IllegalArgumentException("Can only assign operator state when execution attempt is in CREATED");    }    this.operatorState = initialState;    this.recoveryTimestamp = recoveryTimestamp;}

可以看到这里的recovery,只是把我们从zk中获取的checkpoint中的状态赋值给operatorState

然后再deployToSlot,会把初始state,封装到deployment中去,提交给taskManager

public void deployToSlot(final SimpleSlot slot) throws JobException {    final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor(attemptId, slot, operatorState, recoveryTimestamp, attemptNumber);    final Future<Object> deployAction = gateway.ask(new SubmitTask(deployment), timeout);}

 

在TaskManager中的submitTask里面,会创建Task,并执行该task,

 

Task.run()

// the very last thing before the actual execution starts running is to inject// the state into the task. the state is non-empty if this is an execution// of a task that failed but had backuped state from a checkpoint// get our private reference onto the stack (be safe against concurrent changes) SerializedValue<StateHandle<?>> operatorState = this.operatorState; //恢复的statelong recoveryTs = this.recoveryTs;if (operatorState != null) {    if (invokable instanceof StatefulTask) { //如果是一个有状态的task        try {            StateHandle<?> state = operatorState.deserializeValue(userCodeClassLoader); //反序列化数据            StatefulTask<?> op = (StatefulTask<?>) invokable;            StateUtils.setOperatorState(op, state, recoveryTs);//真正的恢复state        }        catch (Exception e) {            throw new RuntimeException("Failed to deserialize state handle and setup initial operator state.", e);        }    }    else {        throw new IllegalStateException("Found operator state for a non-stateful task invokable");    }}// be memory and GC friendly - since the code stays in invoke() for a potentially long time,// we clear the reference to the state handle//noinspection UnusedAssignmentoperatorState = null;this.operatorState = null;
 

StateUtils

public static <T extends StateHandle<?>> void setOperatorState(StatefulTask<?> op,        StateHandle<?> state, long recoveryTimestamp) throws Exception {    @SuppressWarnings("unchecked")    StatefulTask<T> typedOp = (StatefulTask<T>) op;    @SuppressWarnings("unchecked")    T typedHandle = (T) state;    typedOp.setInitialState(typedHandle, recoveryTimestamp);}

 

StreamTask

@Overridepublic void setInitialState(StreamTaskStateList initialState, long recoveryTimestamp) {    lazyRestoreState = initialState; //将状态置到lazyRestoreState    this.recoveryTimestamp = recoveryTimestamp;}
//在StreamTask的invoke中,会调用restoreStateLazy,真正的做状态恢复
public void restoreStateLazy() throws Exception {    if (lazyRestoreState != null) {                try {            final StreamOperator<?>[] allOperators = operatorChain.getAllOperators();            final StreamTaskState[] states = lazyRestoreState.getState(userClassLoader); //获取所有states                        // be GC friendly            lazyRestoreState = null;                        for (int i = 0; i < states.length; i++) {                StreamTaskState state = states[i];                StreamOperator<?> operator = allOperators[i];                                if (state != null && operator != null) {                    operator.restoreState(state, recoveryTimestamp); //最终把state恢复到operator                }                else if (operator != null) {                                }            }        }        catch (Exception e) {            throw new Exception("Could not restore checkpointed state to operators and functions", e);        }    }}

Flink - Checkpoint