首页 > 代码库 > Flink - state管理

Flink - state管理

在Flink – Checkpoint

没有描述了整个checkpoint的流程,但是对于如何生成snapshot和恢复snapshot的过程,并没有详细描述,这里补充

 

StreamOperator

/** * Basic interface for stream operators. Implementers would implement one of * {@link org.apache.flink.streaming.api.operators.OneInputStreamOperator} or * {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator} to create operators * that process elements. *  * <p> The class {@link org.apache.flink.streaming.api.operators.AbstractStreamOperator} * offers default implementation for the lifecycle and properties methods. * * <p> Methods of {@code StreamOperator} are guaranteed not to be called concurrently. Also, if using * the timer service, timer callbacks are also guaranteed not to be called concurrently with * methods on {@code StreamOperator}. *  * @param <OUT> The output type of the operator */public interface StreamOperator<OUT> extends Serializable {        // ------------------------------------------------------------------------    //  life cycle    // ------------------------------------------------------------------------        /**     * Initializes the operator. Sets access to the context and the output.     */    void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output);    /**     * This method is called immediately before any elements are processed, it should contain the     * operator‘s initialization logic.     *      * @throws java.lang.Exception An exception in this method causes the operator to fail.     */    void open() throws Exception;    /**     * This method is called after all records have been added to the operators via the methods     * {@link org.apache.flink.streaming.api.operators.OneInputStreamOperator#processElement(StreamRecord)}, or     * {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator#processElement1(StreamRecord)} and     * {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator#processElement2(StreamRecord)}.     * <p>     * The method is expected to flush all remaining buffered data. Exceptions during this flushing     * of buffered should be propagated, in order to cause the operation to be recognized asa failed,     * because the last data items are not processed properly.     *      * @throws java.lang.Exception An exception in this method causes the operator to fail.     */    void close() throws Exception;    /**     * This method is called at the very end of the operator‘s life, both in the case of a successful     * completion of the operation, and in the case of a failure and canceling.     *      * This method is expected to make a thorough effort to release all resources     * that the operator has acquired.     */    void dispose();    // ------------------------------------------------------------------------    //  state snapshots    // ------------------------------------------------------------------------    /**     * Called to draw a state snapshot from the operator. This method snapshots the operator state     * (if the operator is stateful) and the key/value state (if it is being used and has been     * initialized).     *     * @param checkpointId The ID of the checkpoint.     * @param timestamp The timestamp of the checkpoint.     *     * @return The StreamTaskState object, possibly containing the snapshots for the     *         operator and key/value state.     *     * @throws Exception Forwards exceptions that occur while drawing snapshots from the operator     *                   and the key/value state.     */    StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception;        /**     * Restores the operator state, if this operator‘s execution is recovering from a checkpoint.     * This method restores the operator state (if the operator is stateful) and the key/value state     * (if it had been used and was initialized when the snapshot ocurred).     *     * <p>This method is called after {@link #setup(StreamTask, StreamConfig, Output)}     * and before {@link #open()}.     *     * @param state The state of operator that was snapshotted as part of checkpoint     *              from which the execution is restored.     *      * @param recoveryTimestamp Global recovery timestamp     *     * @throws Exception Exceptions during state restore should be forwarded, so that the system can     *                   properly react to failed state restore and fail the execution attempt.     */    void restoreState(StreamTaskState state, long recoveryTimestamp) throws Exception;    /**     * Called when the checkpoint with the given ID is completed and acknowledged on the JobManager.     *     * @param checkpointId The ID of the checkpoint that has been completed.     *     * @throws Exception Exceptions during checkpoint acknowledgement may be forwarded and will cause     *                   the program to fail and enter recovery.     */    void notifyOfCompletedCheckpoint(long checkpointId) throws Exception;    // ------------------------------------------------------------------------    //  miscellaneous    // ------------------------------------------------------------------------        void setKeyContextElement(StreamRecord<?> record) throws Exception;        /**     * An operator can return true here to disable copying of its input elements. This overrides     * the object-reuse setting on the {@link org.apache.flink.api.common.ExecutionConfig}     */    boolean isInputCopyingDisabled();        ChainingStrategy getChainingStrategy();    void setChainingStrategy(ChainingStrategy strategy);}

这对接口会负责,将operator的state做snapshot和restore相应的state

StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception;

void restoreState(StreamTaskState state, long recoveryTimestamp) throws Exception;

 

首先看到,生成和恢复的时候,都是以StreamTaskState为接口

public class StreamTaskState implements Serializable, Closeable {    private static final long serialVersionUID = 1L;        private StateHandle<?> operatorState;    private StateHandle<Serializable> functionState;    private HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> kvStates;

可以看到,StreamTaskState是对三种state的封装

AbstractStreamOperator,先只考虑kvstate的情况,其他的更简单

@Overridepublic StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {    // here, we deal with key/value state snapshots        StreamTaskState state = new StreamTaskState();    if (stateBackend != null) {        HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> partitionedSnapshots =            stateBackend.snapshotPartitionedState(checkpointId, timestamp);        if (partitionedSnapshots != null) {            state.setKvStates(partitionedSnapshots);        }    }    return state;}@Override@SuppressWarnings("rawtypes,unchecked")public void restoreState(StreamTaskState state) throws Exception {    // restore the key/value state. the actual restore happens lazily, when the function requests    // the state again, because the restore method needs information provided by the user function    if (stateBackend != null) {        stateBackend.injectKeyValueStateSnapshots((HashMap)state.getKvStates());    }}

可以看到flink1.1.0和之前比逻辑简化了,把逻辑都抽象到stateBackend里面去

 

AbstractStateBackend
/** * A state backend defines how state is stored and snapshotted during checkpoints. */public abstract class AbstractStateBackend implements java.io.Serializable {    protected transient TypeSerializer<?> keySerializer;    protected transient ClassLoader userCodeClassLoader;    protected transient Object currentKey;    /** For efficient access in setCurrentKey() */    private transient KvState<?, ?, ?, ?, ?>[] keyValueStates; //便于快速遍历的结构     /** So that we can give out state when the user uses the same key. */    protected transient HashMap<String, KvState<?, ?, ?, ?, ?>> keyValueStatesByName; //记录key的kvState    /** For caching the last accessed partitioned state */    private transient String lastName;    @SuppressWarnings("rawtypes")    private transient KvState lastState;

 

stateBackend.snapshotPartitionedState

public HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> snapshotPartitionedState(long checkpointId, long timestamp) throws Exception {    if (keyValueStates != null) {        HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> snapshots = new HashMap<>(keyValueStatesByName.size());        for (Map.Entry<String, KvState<?, ?, ?, ?, ?>> entry : keyValueStatesByName.entrySet()) {            KvStateSnapshot<?, ?, ?, ?, ?> snapshot = entry.getValue().snapshot(checkpointId, timestamp);            snapshots.put(entry.getKey(), snapshot);        }        return snapshots;    }    return null;}

逻辑很简单,只是把cache的所有kvstate,创建一下snapshot,再push到HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> snapshots

 

stateBackend.injectKeyValueStateSnapshots,只是上面的逆过程

/** * Injects K/V state snapshots for lazy restore. * @param keyValueStateSnapshots The Map of snapshots */@SuppressWarnings("unchecked,rawtypes")public void injectKeyValueStateSnapshots(HashMap<String, KvStateSnapshot> keyValueStateSnapshots) throws Exception {    if (keyValueStateSnapshots != null) {        if (keyValueStatesByName == null) {            keyValueStatesByName = new HashMap<>();        }        for (Map.Entry<String, KvStateSnapshot> state : keyValueStateSnapshots.entrySet()) {            KvState kvState = state.getValue().restoreState(this,                keySerializer,                userCodeClassLoader);            keyValueStatesByName.put(state.getKey(), kvState);        }        keyValueStates = keyValueStatesByName.values().toArray(new KvState[keyValueStatesByName.size()]);    }}

 

具体看看FsState的snapshot和restore逻辑,

AbstractFsState.snapshot

@Overridepublic KvStateSnapshot<K, N, S, SD, FsStateBackend> snapshot(long checkpointId, long timestamp) throws Exception {    try (FsStateBackend.FsCheckpointStateOutputStream out = backend.createCheckpointStateOutputStream(checkpointId, timestamp)) { //        // serialize the state to the output stream        DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(new DataOutputStream(out));         outView.writeInt(state.size());        for (Map.Entry<N, Map<K, SV>> namespaceState: state.entrySet()) {            N namespace = namespaceState.getKey();            namespaceSerializer.serialize(namespace, outView);            outView.writeInt(namespaceState.getValue().size());            for (Map.Entry<K, SV> entry: namespaceState.getValue().entrySet()) {                keySerializer.serialize(entry.getKey(), outView);                stateSerializer.serialize(entry.getValue(), outView);            }        }        outView.flush(); //真实的内容是刷到文件的        // create a handle to the state        return createHeapSnapshot(out.closeAndGetPath()); //snapshot里面需要的只是path    }}

 

createCheckpointStateOutputStream

@Overridepublic FsCheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception {    checkFileSystemInitialized();    Path checkpointDir = createCheckpointDirPath(checkpointID); //根据checkpointId,生成文件path    int bufferSize = Math.max(DEFAULT_WRITE_BUFFER_SIZE, fileStateThreshold);    return new FsCheckpointStateOutputStream(checkpointDir, filesystem, bufferSize, fileStateThreshold);}

 

FsCheckpointStateOutputStream

封装了write,flush, closeAndGetPath接口,

public void flush() throws IOException {    if (!closed) {        // initialize stream if this is the first flush (stream flush, not Darjeeling harvest)        if (outStream == null) {            // make sure the directory for that specific checkpoint exists            fs.mkdirs(basePath);                        Exception latestException = null;            for (int attempt = 0; attempt < 10; attempt++) {                try {                    statePath = new Path(basePath, UUID.randomUUID().toString());                    outStream = fs.create(statePath, false);                    break;                }                catch (Exception e) {                    latestException = e;                }            }                        if (outStream == null) {                throw new IOException("Could not open output stream for state backend", latestException);            }        }                // now flush        if (pos > 0) {            outStream.write(writeBuffer, 0, pos);            pos = 0;        }    }}

 

AbstractFsStateSnapshot.restoreState

@Overridepublic KvState<K, N, S, SD, FsStateBackend> restoreState(    FsStateBackend stateBackend,    final TypeSerializer<K> keySerializer,    ClassLoader classLoader) throws Exception {    // state restore    ensureNotClosed();    try (FSDataInputStream inStream = stateBackend.getFileSystem().open(getFilePath())) {        // make sure the in-progress restore from the handle can be closed         registerCloseable(inStream);        DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(inStream);        final int numKeys = inView.readInt();        HashMap<N, Map<K, SV>> stateMap = new HashMap<>(numKeys);        for (int i = 0; i < numKeys; i++) {            N namespace = namespaceSerializer.deserialize(inView);            final int numValues = inView.readInt();            Map<K, SV> namespaceMap = new HashMap<>(numValues);            stateMap.put(namespace, namespaceMap);            for (int j = 0; j < numValues; j++) {                K key = keySerializer.deserialize(inView);                SV value = stateSerializer.deserialize(inView);                namespaceMap.put(key, value);            }        }        return createFsState(stateBackend, stateMap); //    }    catch (Exception e) {        throw new Exception("Failed to restore state from file system", e);    }}

Flink - state管理