序
本文主要研究一下flink的OperatorStateBackend
OperatorStateBackend
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/OperatorStateBackend.java
/** * Interface that combines both, the user facing {@link OperatorStateStore} interface and the system interface * {@link Snapshotable} * */public interface OperatorStateBackend extends OperatorStateStore, Snapshotable, Collection >, Closeable, Disposable { @Override void dispose();}
- OperatorStateBackend接口继承了OperatorStateStore、Snapshotable、Closeable、Disposable接口
OperatorStateStore
flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/OperatorStateStore.java
/** * This interface contains methods for registering operator state with a managed store. */@PublicEvolvingpublic interface OperatorStateStore {BroadcastState getBroadcastState(MapStateDescriptor stateDescriptor) throws Exception; ListStategetListState(ListStateDescriptorstateDescriptor) throws Exception;ListStategetUnionListState(ListStateDescriptorstateDescriptor) throws Exception; SetgetRegisteredStateNames(); Set getRegisteredBroadcastStateNames(); // ------------------------------------------------------------------------------------------- // Deprecated methods // ------------------------------------------------------------------------------------------- @Deprecated ListStategetOperatorState(ListStateDescriptorstateDescriptor) throws Exception; @DeprecatedListState getSerializableListState(String stateName) throws Exception;}
- OperatorStateStore定义了getBroadcastState、getListState、getUnionListState方法用于create或restore BroadcastState或者ListState;同时也定义了getRegisteredStateNames、getRegisteredBroadcastStateNames用于返回当前注册的state的名称
Snapshotable
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/Snapshotable.java
/** * Interface for operators that can perform snapshots of their state. * * @paramGeneric type of the state object that is created as handle to snapshots. * @paramGeneric type of the state object that used in restore. */@Internalpublic interface Snapshotable extends SnapshotStrategy{ /** * Restores state that was previously snapshotted from the provided parameters. Typically the parameters are state * handles from which the old state is read. * * @param state the old state to restore. */ void restore(@Nullable R state) throws Exception;}
- Snapshotable接口继承了SnapshotStrategy接口,同时定义了restore方法用于restore state
SnapshotStrategy
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/SnapshotStrategy.java
/** * Interface for different snapshot approaches in state backends. Implementing classes should ideally be stateless or at * least threadsafe, i.e. this is a functional interface and is can be called in parallel by multiple checkpoints. * * @paramtype of the returned state object that represents the result of the snapshot operation. */@Internalpublic interface SnapshotStrategy{ /** * Operation that writes a snapshot into a stream that is provided by the given {@link CheckpointStreamFactory} and * returns a @{@link RunnableFuture} that gives a state handle to the snapshot. It is up to the implementation if * the operation is performed synchronous or asynchronous. In the later case, the returned Runnable must be executed * first before obtaining the handle. * * @param checkpointId The ID of the checkpoint. * @param timestamp The timestamp of the checkpoint. * @param streamFactory The factory that we can use for writing our state to streams. * @param checkpointOptions Options for how to perform this checkpoint. * @return A runnable future that will yield a {@link StateObject}. */ @Nonnull RunnableFuturesnapshot( long checkpointId, long timestamp, @Nonnull CheckpointStreamFactory streamFactory, @Nonnull CheckpointOptions checkpointOptions) throws Exception;}
- SnapshotStrategy定义了snapshot方法,给不同的snapshot策略去实现,这里要求snapshot结果返回的类型是StateObject类型
AbstractSnapshotStrategy
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/AbstractSnapshotStrategy.java
/** * Abstract base class for implementing {@link SnapshotStrategy}, that gives a consistent logging across state backends. * * @paramtype of the snapshot result. */public abstract class AbstractSnapshotStrategy implements SnapshotStrategy > { private static final Logger LOG = LoggerFactory.getLogger(AbstractSnapshotStrategy.class); private static final String LOG_SYNC_COMPLETED_TEMPLATE = "{} ({}, synchronous part) in thread {} took {} ms."; private static final String LOG_ASYNC_COMPLETED_TEMPLATE = "{} ({}, asynchronous part) in thread {} took {} ms."; /** Descriptive name of the snapshot strategy that will appear in the log outputs and {@link #toString()}. */ @Nonnull protected final String description; protected AbstractSnapshotStrategy(@Nonnull String description) { this.description = description; } /** * Logs the duration of the synchronous snapshot part from the given start time. */ public void logSyncCompleted(@Nonnull Object checkpointOutDescription, long startTime) { logCompletedInternal(LOG_SYNC_COMPLETED_TEMPLATE, checkpointOutDescription, startTime); } /** * Logs the duration of the asynchronous snapshot part from the given start time. */ public void logAsyncCompleted(@Nonnull Object checkpointOutDescription, long startTime) { logCompletedInternal(LOG_ASYNC_COMPLETED_TEMPLATE, checkpointOutDescription, startTime); } private void logCompletedInternal( @Nonnull String template, @Nonnull Object checkpointOutDescription, long startTime) { long duration = (System.currentTimeMillis() - startTime); LOG.debug( template, description, checkpointOutDescription, Thread.currentThread(), duration); } @Override public String toString() { return "SnapshotStrategy {" + description + "}"; }}
- AbstractSnapshotStrategy是个抽象类,它没有实现SnapshotStrategy定义的snapshot方法,这里只是提供了logSyncCompleted方法打印debug信息
StateObject
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/StateObject.java
/** * Base of all handles that represent checkpointed state in some form. The object may hold * the (small) state directly, or contain a file path (state is in the file), or contain the * metadata to access the state stored in some external database. * *State objects define how to {@link #discardState() discard state} and how to access the * {@link #getStateSize() size of the state}. * *
State Objects are transported via RPC between JobManager and * TaskManager and must be {@link java.io.Serializable serializable} to support that. * *
Some State Objects are stored in the checkpoint/savepoint metadata. For long-term * compatibility, they are not stored via {@link java.io.Serializable Java Serialization}, * but through custom serializers. */public interface StateObject extends Serializable { void discardState() throws Exception; long getStateSize();}
- StateObject继承了Serializable接口,因为会通过rpc在JobManager及TaskManager之间进行传输;这个接口定义了discardState及getStateSize方法,discardState用于清理资源,而getStateSize用于返回state的大小
StreamStateHandle
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/StreamStateHandle.java
/** * A {@link StateObject} that represents state that was written to a stream. The data can be read * back via {@link #openInputStream()}. */public interface StreamStateHandle extends StateObject { /** * Returns an {@link FSDataInputStream} that can be used to read back the data that * was previously written to the stream. */ FSDataInputStream openInputStream() throws IOException;}
- StreamStateHandle继承了StateObject接口,多定义了openInputStream方法
OperatorStateHandle
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/OperatorStateHandle.java
/** * Interface of a state handle for operator state. */public interface OperatorStateHandle extends StreamStateHandle { /** * Returns a map of meta data for all contained states by their name. */ MapgetStateNameToPartitionOffsets(); /** * Returns an input stream to read the operator state information. */ @Override FSDataInputStream openInputStream() throws IOException; /** * Returns the underlying stream state handle that points to the state data. */ StreamStateHandle getDelegateStateHandle(); //......}
- OperatorStateHandle继承了StreamStateHandle,它多定义了getStateNameToPartitionOffsets、getDelegateStateHandle方法,其中getStateNameToPartitionOffsets提供了state name到可用partitions的offset的映射信息
OperatorStreamStateHandle
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/OperatorStreamStateHandle.java
/** * State handle for partitionable operator state. Besides being a {@link StreamStateHandle}, this also provides a * map that contains the offsets to the partitions of named states in the stream. */public class OperatorStreamStateHandle implements OperatorStateHandle { private static final long serialVersionUID = 35876522969227335L; /** * unique state name -> offsets for available partitions in the handle stream */ private final MapstateNameToPartitionOffsets; private final StreamStateHandle delegateStateHandle; public OperatorStreamStateHandle( Map stateNameToPartitionOffsets, StreamStateHandle delegateStateHandle) { this.delegateStateHandle = Preconditions.checkNotNull(delegateStateHandle); this.stateNameToPartitionOffsets = Preconditions.checkNotNull(stateNameToPartitionOffsets); } @Override public Map getStateNameToPartitionOffsets() { return stateNameToPartitionOffsets; } @Override public void discardState() throws Exception { delegateStateHandle.discardState(); } @Override public long getStateSize() { return delegateStateHandle.getStateSize(); } @Override public FSDataInputStream openInputStream() throws IOException { return delegateStateHandle.openInputStream(); } @Override public StreamStateHandle getDelegateStateHandle() { return delegateStateHandle; } //......}
- OperatorStreamStateHandle实现了OperatorStateHandle接口,它定义了stateNameToPartitionOffsets属性(
Map<String, StateMetaInfo>
),而getStateNameToPartitionOffsets方法就是返回的stateNameToPartitionOffsets属性
SnapshotResult
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/SnapshotResult.java
/** * This class contains the combined results from the snapshot of a state backend: *
- *
- A state object representing the state that will be reported to the Job Manager to acknowledge the checkpoint. *
- A state object that represents the state for the {@link TaskLocalStateStoreImpl}. *
- SnapshotResult类实现了StateObject接口,它包装了snapshot的结果,这里包括jobManagerOwnedSnapshot、taskLocalSnapshot;它实现的discardState方法,调用了jobManagerOwnedSnapshot及taskLocalSnapshot的discardState方法;getStateSize方法则返回的是jobManagerOwnedSnapshot的stateSize
DefaultOperatorStateBackend
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
/** * Default implementation of OperatorStateStore that provides the ability to make snapshots. */@Internalpublic class DefaultOperatorStateBackend implements OperatorStateBackend { private static final Logger LOG = LoggerFactory.getLogger(DefaultOperatorStateBackend.class); /** * The default namespace for state in cases where no state name is provided */ public static final String DEFAULT_OPERATOR_STATE_NAME = "_default_"; /** * Map for all registered operator states. Maps state name -> state */ private final Map> registeredOperatorStates; /** * Map for all registered operator broadcast states. Maps state name -> state */ private final Map > registeredBroadcastStates; /** * CloseableRegistry to participate in the tasks lifecycle. */ private final CloseableRegistry closeStreamOnCancelRegistry; /** * Default serializer. Only used for the default operator state. */ private final JavaSerializer javaSerializer; /** * The user code classloader. */ private final ClassLoader userClassloader; /** * The execution configuration. */ private final ExecutionConfig executionConfig; /** * Flag to de/activate asynchronous snapshots. */ private final boolean asynchronousSnapshots; /** * Map of state names to their corresponding restored state meta info. * * TODO this map can be removed when eager-state registration is in place. * TODO we currently need this cached to check state migration strategies when new serializers are registered. */ private final Map
restoredOperatorStateMetaInfos; /** * Map of state names to their corresponding restored broadcast state meta info. */ private final Map restoredBroadcastStateMetaInfos; /** * Cache of already accessed states. * * In contrast to {@link #registeredOperatorStates} and {@link #restoredOperatorStateMetaInfos} which may be repopulated * with restored state, this map is always empty at the beginning. * *
TODO this map should be moved to a base class once we have proper hierarchy for the operator state backends. * * @see FLINK-6849 */ private final HashMap
> accessedStatesByName; private final Map > accessedBroadcastStatesByName; private final AbstractSnapshotStrategy snapshotStrategy; public DefaultOperatorStateBackend( ClassLoader userClassLoader, ExecutionConfig executionConfig, boolean asynchronousSnapshots) { this.closeStreamOnCancelRegistry = new CloseableRegistry(); this.userClassloader = Preconditions.checkNotNull(userClassLoader); this.executionConfig = executionConfig; this.javaSerializer = new JavaSerializer<>(); this.registeredOperatorStates = new HashMap<>(); this.registeredBroadcastStates = new HashMap<>(); this.asynchronousSnapshots = asynchronousSnapshots; this.accessedStatesByName = new HashMap<>(); this.accessedBroadcastStatesByName = new HashMap<>(); this.restoredOperatorStateMetaInfos = new HashMap<>(); this.restoredBroadcastStateMetaInfos = new HashMap<>(); this.snapshotStrategy = new DefaultOperatorStateBackendSnapshotStrategy(); } @Override public Set getRegisteredStateNames() { return registeredOperatorStates.keySet(); } @Override public Set getRegisteredBroadcastStateNames() { return registeredBroadcastStates.keySet(); } @Override public void close() throws IOException { closeStreamOnCancelRegistry.close(); } @Override public void dispose() { IOUtils.closeQuietly(closeStreamOnCancelRegistry); registeredOperatorStates.clear(); registeredBroadcastStates.clear(); } // ------------------------------------------------------------------------------------------- // State access methods // ------------------------------------------------------------------------------------------- @SuppressWarnings("unchecked") @Override public BroadcastState getBroadcastState(final MapStateDescriptor stateDescriptor) throws StateMigrationException { //...... } @Override public ListStategetListState(ListStateDescriptorstateDescriptor) throws Exception { return getListState(stateDescriptor, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE); } @Override publicListStategetUnionListState(ListStateDescriptorstateDescriptor) throws Exception { return getListState(stateDescriptor, OperatorStateHandle.Mode.UNION); } @Nonnull @Override public RunnableFuture> snapshot( long checkpointId, long timestamp, @Nonnull CheckpointStreamFactory streamFactory, @Nonnull CheckpointOptions checkpointOptions) throws Exception { long syncStartTime = System.currentTimeMillis(); RunnableFuture > snapshotRunner = snapshotStrategy.snapshot(checkpointId, timestamp, streamFactory, checkpointOptions); snapshotStrategy.logSyncCompleted(streamFactory, syncStartTime); return snapshotRunner; } //......}
- DefaultOperatorStateBackend实现了OperatorStateBackend接口
- getRegisteredStateNames方法返回的是registeredOperatorStates.keySet();getRegisteredBroadcastStateNames方法返回的是registeredBroadcastStates.keySet(),可以看到这两个都是基于内存的Map来实现的
- close方法主要是调用closeStreamOnCancelRegistry的close方法;dispose方法也会关闭closeStreamOnCancelRegistry,同时清空registeredOperatorStates及registeredBroadcastStates
- getListState及getUnionListState方法都调用了getListState(ListStateDescriptor<S> stateDescriptor,OperatorStateHandle.Mode mode)方法
- snapshot方法使用的snapshotStrategy是DefaultOperatorStateBackendSnapshotStrategy
DefaultOperatorStateBackend.getListState
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
privateListStategetListState( ListStateDescriptorstateDescriptor, OperatorStateHandle.Mode mode) throws StateMigrationException { Preconditions.checkNotNull(stateDescriptor); String name = Preconditions.checkNotNull(stateDescriptor.getName()); @SuppressWarnings("unchecked") PartitionableListStateprevious = (PartitionableListState) accessedStatesByName.get(name); if (previous != null) { checkStateNameAndMode( previous.getStateMetaInfo().getName(), name, previous.getStateMetaInfo().getAssignmentMode(), mode); return previous; } // end up here if its the first time access after execution for the // provided state name; check compatibility of restored state, if any // TODO with eager registration in place, these checks should be moved to restore() stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig()); TypeSerializerpartitionStateSerializer = Preconditions.checkNotNull(stateDescriptor.getElementSerializer()); @SuppressWarnings("unchecked") PartitionableListStatepartitionableListState = (PartitionableListState) registeredOperatorStates.get(name); if (null == partitionableListState) { // no restored state for the state name; simply create new state holder partitionableListState = new PartitionableListState<>( new RegisteredOperatorStateBackendMetaInfo<>( name, partitionStateSerializer, mode)); registeredOperatorStates.put(name, partitionableListState); } else { // has restored state; check compatibility of new state access checkStateNameAndMode( partitionableListState.getStateMetaInfo().getName(), name, partitionableListState.getStateMetaInfo().getAssignmentMode(), mode); StateMetaInfoSnapshot restoredSnapshot = restoredOperatorStateMetaInfos.get(name); RegisteredOperatorStateBackendMetaInfometaInfo = new RegisteredOperatorStateBackendMetaInfo<>(restoredSnapshot); // check compatibility to determine if state migration is required TypeSerializernewPartitionStateSerializer = partitionStateSerializer.duplicate(); @SuppressWarnings("unchecked") TypeSerializerSnapshotstateSerializerSnapshot = Preconditions.checkNotNull( (TypeSerializerSnapshot) restoredSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER)); TypeSerializerSchemaCompatibilitystateCompatibility = stateSerializerSnapshot.resolveSchemaCompatibility(newPartitionStateSerializer); if (stateCompatibility.isIncompatible()) { throw new StateMigrationException("The new state serializer for operator state must not be incompatible."); } partitionableListState.setStateMetaInfo( new RegisteredOperatorStateBackendMetaInfo<>(name, newPartitionStateSerializer, mode)); } accessedStatesByName.put(name, partitionableListState); return partitionableListState; }
- 从registeredOperatorStates获取对应PartitionableListState,没有的话则创建,有的话则检查下兼容性,然后往partitionableListState设置stateMetaInfo
DefaultOperatorStateBackendSnapshotStrategy
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
/** * Snapshot strategy for this backend. */ private class DefaultOperatorStateBackendSnapshotStrategy extends AbstractSnapshotStrategy{ protected DefaultOperatorStateBackendSnapshotStrategy() { super("DefaultOperatorStateBackend snapshot"); } @Nonnull @Override public RunnableFuture > snapshot( final long checkpointId, final long timestamp, @Nonnull final CheckpointStreamFactory streamFactory, @Nonnull final CheckpointOptions checkpointOptions) throws IOException { if (registeredOperatorStates.isEmpty() && registeredBroadcastStates.isEmpty()) { return DoneFuture.of(SnapshotResult.empty()); } final Map > registeredOperatorStatesDeepCopies = new HashMap<>(registeredOperatorStates.size()); final Map > registeredBroadcastStatesDeepCopies = new HashMap<>(registeredBroadcastStates.size()); ClassLoader snapshotClassLoader = Thread.currentThread().getContextClassLoader(); Thread.currentThread().setContextClassLoader(userClassloader); try { // eagerly create deep copies of the list and the broadcast states (if any) // in the synchronous phase, so that we can use them in the async writing. if (!registeredOperatorStates.isEmpty()) { for (Map.Entry > entry : registeredOperatorStates.entrySet()) { PartitionableListState listState = entry.getValue(); if (null != listState) { listState = listState.deepCopy(); } registeredOperatorStatesDeepCopies.put(entry.getKey(), listState); } } if (!registeredBroadcastStates.isEmpty()) { for (Map.Entry > entry : registeredBroadcastStates.entrySet()) { BackendWritableBroadcastState broadcastState = entry.getValue(); if (null != broadcastState) { broadcastState = broadcastState.deepCopy(); } registeredBroadcastStatesDeepCopies.put(entry.getKey(), broadcastState); } } } finally { Thread.currentThread().setContextClassLoader(snapshotClassLoader); } AsyncSnapshotCallable > snapshotCallable = new AsyncSnapshotCallable >() { @Override protected SnapshotResult callInternal() throws Exception { CheckpointStreamFactory.CheckpointStateOutputStream localOut = streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE); registerCloseableForCancellation(localOut); // get the registered operator state infos ... List operatorMetaInfoSnapshots = new ArrayList<>(registeredOperatorStatesDeepCopies.size()); for (Map.Entry > entry : registeredOperatorStatesDeepCopies.entrySet()) { operatorMetaInfoSnapshots.add(entry.getValue().getStateMetaInfo().snapshot()); } // ... get the registered broadcast operator state infos ... List broadcastMetaInfoSnapshots = new ArrayList<>(registeredBroadcastStatesDeepCopies.size()); for (Map.Entry > entry : registeredBroadcastStatesDeepCopies.entrySet()) { broadcastMetaInfoSnapshots.add(entry.getValue().getStateMetaInfo().snapshot()); } // ... write them all in the checkpoint stream ... DataOutputView dov = new DataOutputViewStreamWrapper(localOut); OperatorBackendSerializationProxy backendSerializationProxy = new OperatorBackendSerializationProxy(operatorMetaInfoSnapshots, broadcastMetaInfoSnapshots); backendSerializationProxy.write(dov); // ... and then go for the states ... // we put BOTH normal and broadcast state metadata here int initialMapCapacity = registeredOperatorStatesDeepCopies.size() + registeredBroadcastStatesDeepCopies.size(); final Map writtenStatesMetaData = new HashMap<>(initialMapCapacity); for (Map.Entry > entry : registeredOperatorStatesDeepCopies.entrySet()) { PartitionableListState value = entry.getValue(); long[] partitionOffsets = value.write(localOut); OperatorStateHandle.Mode mode = value.getStateMetaInfo().getAssignmentMode(); writtenStatesMetaData.put( entry.getKey(), new OperatorStateHandle.StateMetaInfo(partitionOffsets, mode)); } // ... and the broadcast states themselves ... for (Map.Entry > entry : registeredBroadcastStatesDeepCopies.entrySet()) { BackendWritableBroadcastState value = entry.getValue(); long[] partitionOffsets = {value.write(localOut)}; OperatorStateHandle.Mode mode = value.getStateMetaInfo().getAssignmentMode(); writtenStatesMetaData.put( entry.getKey(), new OperatorStateHandle.StateMetaInfo(partitionOffsets, mode)); } // ... and, finally, create the state handle. OperatorStateHandle retValue = null; if (unregisterCloseableFromCancellation(localOut)) { StreamStateHandle stateHandle = localOut.closeAndGetHandle(); if (stateHandle != null) { retValue = new OperatorStreamStateHandle(writtenStatesMetaData, stateHandle); } return SnapshotResult.of(retValue); } else { throw new IOException("Stream was already unregistered."); } } @Override protected void cleanupProvidedResources() { // nothing to do } @Override protected void logAsyncSnapshotComplete(long startTime) { if (asynchronousSnapshots) { logAsyncCompleted(streamFactory, startTime); } } }; final FutureTask > task = snapshotCallable.toAsyncSnapshotFutureTask(closeStreamOnCancelRegistry); if (!asynchronousSnapshots) { task.run(); } return task; } }
- DefaultOperatorStateBackendSnapshotStrategy继承了AbstractSnapshotStrategy,它实现的snapshot方法主要是创建registeredOperatorStatesDeepCopies及registeredBroadcastStatesDeepCopies,然后通过AsyncSnapshotCallable来实现
- AsyncSnapshotCallable抽象类实现了Callable接口的call方法,该方法会调用callInternal方法,然后再执行logAsyncSnapshotComplete方法
- AsyncSnapshotCallable的callInternal方法返回的是SnapshotResult<OperatorStateHandle>,它里头主要是将registeredOperatorStatesDeepCopies及registeredBroadcastStatesDeepCopies的数据写入到CheckpointStreamFactory(
比如MemCheckpointStreamFactory
).CheckpointStateOutputStream及writtenStatesMetaData,最后通过CheckpointStateOutputStream的closeAndGetHandle返回的stateHandle及writtenStatesMetaData创建OperatorStreamStateHandle返回
小结
- OperatorStateBackend接口继承了OperatorStateStore、Snapshotable、Closeable、Disposable接口
- OperatorStateStore定义了getBroadcastState、getListState、getUnionListState方法用于create或restore BroadcastState或者ListState;同时也定义了getRegisteredStateNames、getRegisteredBroadcastStateNames用于返回当前注册的state的名称;DefaultOperatorStateBackend实现了OperatorStateStore接口,getRegisteredStateNames方法返回的是registeredOperatorStates.keySet();getRegisteredBroadcastStateNames方法返回的是registeredBroadcastStates.keySet()(
registeredOperatorStates及registeredBroadcastStates这两个都是内存的Map
);getListState及getUnionListState方法都调用了getListState(ListStateDescriptor<S> stateDescriptor,OperatorStateHandle.Mode mode)方法 - Snapshotable接口继承了SnapshotStrategy接口,同时定义了restore方法用于restore state;SnapshotStrategy定义了snapshot方法,给不同的snapshot策略去实现,这里要求snapshot结果返回的类型是StateObject类型;AbstractSnapshotStrategy是个抽象类,它没有实现SnapshotStrategy定义的snapshot方法,这里只是提供了logSyncCompleted方法打印debug信息
- DefaultOperatorStateBackend实现了Snapshotable接口,snapshot方法使用的snapshotStrategy是DefaultOperatorStateBackendSnapshotStrategy;DefaultOperatorStateBackendSnapshotStrategy继承了AbstractSnapshotStrategy,它实现的snapshot方法主要是创建registeredOperatorStatesDeepCopies及registeredBroadcastStatesDeepCopies,然后通过AsyncSnapshotCallable来实现,它里头主要是将registeredOperatorStatesDeepCopies及registeredBroadcastStatesDeepCopies的数据写入到CheckpointStreamFactory(
比如MemCheckpointStreamFactory
).CheckpointStateOutputStream及writtenStatesMetaData - Snapshotable接口要求source的泛型为StateObject类型,StateObject继承了Serializable接口,因为会通过rpc在JobManager及TaskManager之间进行传输;OperatorStateBackend继承Snapshotable接口时,指定source为SnapshotResult<OperatorStateHandle>,而result的为Collection<OperatorStateHandle>类型
- StreamStateHandle继承了StateObject接口,多定义了openInputStream方法;OperatorStateHandle继承了StreamStateHandle,它多定义了getStateNameToPartitionOffsets、getDelegateStateHandle方法,其中getStateNameToPartitionOffsets提供了state name到可用partitions的offset的映射信息;OperatorStreamStateHandle实现了OperatorStateHandle接口,它定义了stateNameToPartitionOffsets属性(
Map<String,StateMetaInfo>
),而getStateNameToPartitionOffsets方法就是返回的stateNameToPartitionOffsets属性 - SnapshotResult类实现了StateObject接口,它包装了snapshot的结果,这里包括jobManagerOwnedSnapshot、taskLocalSnapshot;它实现的discardState方法,调用了jobManagerOwnedSnapshot及taskLocalSnapshot的discardState方法;getStateSize方法则返回的是jobManagerOwnedSnapshot的stateSize