flink自定义序列化管理状态

原创
2022/02/13 23:29
阅读数 91

本页面的目标是为需要对其状态使用自定义序列化的用户提供指导方针,包括如何提供自定义状态序列化器,以及实现允许状态模式演化的序列化器的指导方针和最佳实践。

如果您只是使用Flink自己的序列化器,则此页面无关紧要,可以忽略。

使用自定义状态序列化器#
当注册托管操作符或键控状态时,需要一个state descriptor来指定状态的名称,以及关于状态类型的信息。Flink的类型序列化框架使用类型信息为状态创建适当的序列化器。

也可以完全绕过这个,让Flink使用你自己的自定义序列化器来序列化托管状态,只需简单地用你自己的TypeSerializer实现直接实例化StateDescriptor:

Java
public class CustomTypeSerializer extends TypeSerializer<Tuple2<String, Integer>> {...};

ListStateDescriptor<Tuple2<String, Integer>> descriptor =
    new ListStateDescriptor<>(
        "state-name",
        new CustomTypeSerializer());

checkpointedState = getRuntimeContext().getListState(descriptor);

State serializers and schema evolution #

本节解释了与状态序列化和模式演化相关的面向用户的抽象,以及关于Flink如何与这些抽象交互的必要内部细节。

当从保存点恢复时,Flink允许更改用于读取和写入先前注册状态的序列化器,这样用户就不会被锁定在任何特定的序列化模式中。当状态恢复时,将为状态注册一个新的序列化器(即,用于访问恢复作业中的状态的statdescriptor所附带的序列化器)。这个新的序列化器可能具有与以前的序列化器不同的模式。因此,在实现状态序列化器时,除了读取/写入数据的基本逻辑之外,还要记住的另一件重要的事情是将来如何更改序列化模式。

说到模式,在这个上下文中,这个术语在引用状态类型的数据模型和引用状态类型的序列化二进制格式之间是可以互换的。一般来说,模式可以在以下几种情况下改变:

状态类型的数据模式发生了变化,即从POJO中添加或删除用作状态的字段。
一般来说,在数据模式更改之后,序列化器的序列化格式需要升级。
序列化器的配置已更改。
为了让新执行拥有关于已写入的状态模式的信息,并检测模式是否已更改,在获取操作符状态的保存点时,需要将状态序列化器的快照与状态字节一起写入。这是一个抽象的TypeSerializerSnapshot,将在下一小节中解释

public interface TypeSerializerSnapshot<T> {
    int getCurrentVersion();
    void writeSnapshot(DataOuputView out) throws IOException;
    void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException;
    TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializer<T> newSerializer);
    TypeSerializer<T> restoreSerializer();
}
public abstract class TypeSerializer<T> {    
    
    // ...
    
    public abstract TypeSerializerSnapshot<T> snapshotConfiguration();
}

A serializer’s TypeSerializerSnapshot is a point-in-time information that serves as the single source of truth about the state serializer’s write schema, as well as any additional information mandatory to restore a serializer that would be identical to the given point-in-time. The logic about what should be written and read at restore time as the serializer snapshot is defined in the writeSnapshot and readSnapshot methods.

Note that the snapshot’s own write schema may also need to change over time (e.g. when you wish to add more information about the serializer to the snapshot). To facilitate this, snapshots are versioned, with the current version number defined in the getCurrentVersion method. On restore, when the serializer snapshot is read from savepoints, the version of the schema in which the snapshot was written in will be provided to the readSnapshot method so that the read implementation can handle different versions.

At restore time, the logic that detects whether or not the new serializer’s schema has changed should be implemented in the resolveSchemaCompatibility method. When previous registered state is registered again with new serializers in the restored execution of an operator, the new serializer is provided to the previous serializer’s snapshot via this method. This method returns a TypeSerializerSchemaCompatibility representing the result of the compatibility resolution, which can be one of the following:

  1. TypeSerializerSchemaCompatibility.compatibleAsIs(): this result signals that the new serializer is compatible, meaning that the new serializer has identical schema with the previous serializer. It is possible that the new serializer has been reconfigured in the resolveSchemaCompatibility method so that it is compatible.
  2. TypeSerializerSchemaCompatibility.compatibleAfterMigration(): this result signals that the new serializer has a different serialization schema, and it is possible to migrate from the old schema by using the previous serializer (which recognizes the old schema) to read bytes into state objects, and then rewriting the object back to bytes with the new serializer (which recognizes the new schema).
  3. TypeSerializerSchemaCompatibility.incompatible(): this result signals that the new serializer has a different serialization schema, but it is not possible to migrate from the old schema.

The last bit of detail is how the previous serializer is obtained in the case that migration is required. Another important role of a serializer’s TypeSerializerSnapshot is that it serves as a factory to restore the previous serializer. More specifically, the TypeSerializerSnapshot should implement the restoreSerializer method to instantiate a serializer instance that recognizes the previous serializer’s schema and configuration, and can therefore safely read data written by the previous serializer.

How Flink interacts with the TypeSerializer and TypeSerializerSnapshot abstractions #

To wrap up, this section concludes how Flink, or more specifically the state backends, interact with the abstractions. The interaction is slightly different depending on the state backend, but this is orthogonal to the implementation of state serializers and their serializer snapshots.

Off-heap state backends (e.g. RocksDBStateBackend) #

  1. Register new state with a state serializer that has schema A
  • the registered TypeSerializer for the state is used to read / write state on every state access.
  • State is written in schema A.
  1. Take a savepoint
  • The serializer snapshot is extracted via the TypeSerializer#snapshotConfiguration method.
  • The serializer snapshot is written to the savepoint, as well as the already-serialized state bytes (with schema A).
  1. Restored execution re-accesses restored state bytes with new state serializer that has schema B
  • The previous state serializer’s snapshot is restored.
  • State bytes are not deserialized on restore, only loaded back to the state backends (therefore, still in schema A).
  • Upon receiving the new serializer, it is provided to the restored previous serializer’s snapshot via the TypeSerializer#resolveSchemaCompatibility to check for schema compatibility.
  1. Migrate state bytes in backend from schema A to schema B
  • If the compatibility resolution reflects that the schema has changed and migration is possible, schema migration is performed. The previous state serializer which recognizes schema A will be obtained from the serializer snapshot, via TypeSerializerSnapshot#restoreSerializer(), and is used to deserialize state bytes to objects, which in turn are re-written again with the new serializer, which recognizes schema B to complete the migration. All entries of the accessed state is migrated all-together before processing continues.
  • If the resolution signals incompatibility, then the state access fails with an exception.

Heap state backends (e.g. MemoryStateBackend, FsStateBackend) #

  1. Register new state with a state serializer that has schema A
  • the registered TypeSerializer is maintained by the state backend.
  1. Take a savepoint, serializing all state with schema A
  • The serializer snapshot is extracted via the TypeSerializer#snapshotConfiguration method.
  • The serializer snapshot is written to the savepoint.
  • State objects are now serialized to the savepoint, written in schema A.
  1. On restore, deserialize state into objects in heap
  • The previous state serializer’s snapshot is restored.
  • The previous serializer, which recognizes schema A, is obtained from the serializer snapshot, via TypeSerializerSnapshot#restoreSerializer(), and is used to deserialize state bytes to objects.
  • From now on, all of the state is already deserialized.
  1. Restored execution re-accesses previous state with new state serializer that has schema B
  • Upon receiving the new serializer, it is provided to the restored previous serializer’s snapshot via the TypeSerializer#resolveSchemaCompatibility to check for schema compatibility.
  • If the compatibility check signals that migration is required, nothing happens in this case since for heap backends, all state is already deserialized into objects.
  • If the resolution signals incompatibility, then the state access fails with an exception.
  1. Take another savepoint, serializing all state with schema B
  • Same as step 2., but now state bytes are all in schema B.

Predefined convenient TypeSerializerSnapshot classes #

Flink provides two abstract base TypeSerializerSnapshot classes that can be used for typical scenarios: SimpleTypeSerializerSnapshot and CompositeTypeSerializerSnapshot.

Serializers that provide these predefined snapshots as their serializer snapshot must always have their own, independent subclass implementation. This corresponds to the best practice of not sharing snapshot classes across different serializers, which is more thoroughly explained in the next section.

Implementing a SimpleTypeSerializerSnapshot #

The SimpleTypeSerializerSnapshot is intended for serializers that do not have any state or configuration, essentially meaning that the serialization schema of the serializer is solely defined by the serializer’s class.

There will only be 2 possible results of the compatibility resolution when using the SimpleTypeSerializerSnapshot as your serializer’s snapshot class:

  • TypeSerializerSchemaCompatibility.compatibleAsIs(), if the new serializer class remains identical, or
  • TypeSerializerSchemaCompatibility.incompatible(), if the new serializer class is different then the previous one.

Below is an example of how the SimpleTypeSerializerSnapshot is used, using Flink’s IntSerializer as an example:

public class IntSerializerSnapshot extends SimpleTypeSerializerSnapshot<Integer> {
    public IntSerializerSnapshot() {
        super(() -> IntSerializer.INSTANCE);
    }
}

IntSerializer没有状态或配置。序列化格式由序列化器类本身单独定义,并且只能被另一个IntSerializer读取。因此,它适合SimpleTypeSerializerSnapshot的用例。

SimpleTypeSerializerSnapshot的基本超级构造函数需要一个对应序列化器实例的供应商,而不管快照当前是否正在恢复或在快照期间被写入。该供应商用于创建恢复序列化器,以及进行类型检查,以验证新序列化器是否属于预期的序列化器类。

实现一个CompositeTypeSerializerSnapshot #
CompositeTypeSerializerSnapshot用于依赖于多个嵌套序列化器进行序列化的序列化器。

在进一步解释之前,我们将依赖于多个嵌套序列化器的序列化器称为该上下文中的“外部”序列化器。例如MapSerializer、ListSerializer、GenericArraySerializer等。例如,考虑MapSerializer——键和值序列化器将是嵌套的序列化器,而MapSerializer本身是“外部”的序列化器。

在这种情况下,外部序列化器的快照也应该包含嵌套序列化器的快照,以便可以独立地检查嵌套序列化器的兼容性。在解析外部序列化器的兼容性时,需要考虑每个嵌套序列化器的兼容性。

提供CompositeTypeSerializerSnapshot是为了帮助实现这类复合序列化器的快照。它处理读取和写入嵌套序列化器快照,并考虑到所有嵌套序列化器的兼容性来解析最终的兼容性结果。

下面是一个使用CompositeTypeSerializerSnapshot的例子,使用了Flink的MapSerializer作为例子:

public class MapSerializerSnapshot<K, V> extends CompositeTypeSerializerSnapshot<Map<K, V>, MapSerializer> {

    private static final int CURRENT_VERSION = 1;

    public MapSerializerSnapshot() {
        super(MapSerializer.class);
    }

    public MapSerializerSnapshot(MapSerializer<K, V> mapSerializer) {
        super(mapSerializer);
    }

    @Override
    public int getCurrentOuterSnapshotVersion() {
        return CURRENT_VERSION;
    }

    @Override
    protected MapSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
        TypeSerializer<K> keySerializer = (TypeSerializer<K>) nestedSerializers[0];
        TypeSerializer<V> valueSerializer = (TypeSerializer<V>) nestedSerializers[1];
        return new MapSerializer<>(keySerializer, valueSerializer);
    }

    @Override
    protected TypeSerializer<?>[] getNestedSerializers(MapSerializer outerSerializer) {
        return new TypeSerializer<?>[] { outerSerializer.getKeySerializer(), outerSerializer.getValueSerializer() };
    }
}

When implementing a new serializer snapshot as a subclass of CompositeTypeSerializerSnapshot, the following three methods must be implemented:

  • #getCurrentOuterSnapshotVersion(): This method defines the version of the current outer serializer snapshot’s serialized binary format.
  • #getNestedSerializers(TypeSerializer): Given the outer serializer, returns its nested serializers.
  • #createOuterSerializerWithNestedSerializers(TypeSerializer[]): Given the nested serializers, create an instance of the outer serializer.

The above example is a CompositeTypeSerializerSnapshot where there are no extra information to be snapshotted apart from the nested serializers' snapshots. Therefore, its outer snapshot version can be expected to never require an uptick. Some other serializers, however, contains some additional static configuration that needs to be persisted along with the nested component serializer. An example for this would be Flink’s GenericArraySerializer, which contains as configuration the class of the array element type, besides the nested element serializer.

In these cases, an additional three methods need to be implemented on the CompositeTypeSerializerSnapshot:

  • #writeOuterSnapshot(DataOutputView): defines how the outer snapshot information is written.
  • #readOuterSnapshot(int, DataInputView, ClassLoader): defines how the outer snapshot information is read.
  • #resolveOuterSchemaCompatibility(TypeSerializer): checks the compatibility based on the outer snapshot information.

By default, the CompositeTypeSerializerSnapshot assumes that there isn’t any outer snapshot information to read / write, and therefore have empty default implementations for the above methods. If the subclass has outer snapshot information, then all three methods must be implemented.

Below is an example of how the CompositeTypeSerializerSnapshot is used for composite serializer snapshots that do have outer snapshot information, using Flink’s GenericArraySerializer as an example:

public final class GenericArraySerializerSnapshot<C> extends CompositeTypeSerializerSnapshot<C[], GenericArraySerializer> {

    private static final int CURRENT_VERSION = 1;

    private Class<C> componentClass;

    public GenericArraySerializerSnapshot() {
        super(GenericArraySerializer.class);
    }

    public GenericArraySerializerSnapshot(GenericArraySerializer<C> genericArraySerializer) {
        super(genericArraySerializer);
        this.componentClass = genericArraySerializer.getComponentClass();
    }

    @Override
    protected int getCurrentOuterSnapshotVersion() {
        return CURRENT_VERSION;
    }

    @Override
    protected void writeOuterSnapshot(DataOutputView out) throws IOException {
        out.writeUTF(componentClass.getName());
    }

    @Override
    protected void readOuterSnapshot(int readOuterSnapshotVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
        this.componentClass = InstantiationUtil.resolveClassByName(in, userCodeClassLoader);
    }

    @Override
    protected boolean resolveOuterSchemaCompatibility(GenericArraySerializer newSerializer) {
        return (this.componentClass == newSerializer.getComponentClass())
            ? OuterSchemaCompatibility.COMPATIBLE_AS_IS
            : OuterSchemaCompatibility.INCOMPATIBLE;
    }

    @Override
    protected GenericArraySerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
        TypeSerializer<C> componentSerializer = (TypeSerializer<C>) nestedSerializers[0];
        return new GenericArraySerializer<>(componentClass, componentSerializer);
    }

    @Override
    protected TypeSerializer<?>[] getNestedSerializers(GenericArraySerializer outerSerializer) {
        return new TypeSerializer<?>[] { outerSerializer.getComponentSerializer() };
    }
}

在上面的代码片段中有两件重要的事情需要注意。首先,由于这个CompositeTypeSerializerSnapshot实现具有作为快照的一部分编写的外部快照信息,因此每当外部快照信息的序列化格式发生变化时,由getCurrentOuterSnapshotVersion()定义的外部快照版本必须被向上标记。

其次,注意我们在编写组件类时如何避免使用Java序列化,方法是只编写类名,并在读取快照时动态加载它。避免使用Java序列化来编写序列化器快照的内容通常是一个很好的实践。关于这一点的更多细节将在下一节中讨论。

Implementation notes and best practices #

1. Flink restores serializer snapshots by instantiating them with their classname #

A serializer’s snapshot, being the single source of truth for how a registered state was serialized, serves as an entry point to reading state in savepoints. In order to be able to restore and access previous state, the previous state serializer’s snapshot must be able to be restored.

Flink restores serializer snapshots by first instantiating the TypeSerializerSnapshot with its classname (written along with the snapshot bytes). Therefore, to avoid being subject to unintended classname changes or instantiation failures, TypeSerializerSnapshot classes should:

  • avoid being implemented as anonymous classes or nested classes,
  • have a public, nullary constructor for instantiation

2. Avoid sharing the same TypeSerializerSnapshot class across different serializers #

Since schema compatibility checks goes through the serializer snapshots, having multiple serializers returning the same TypeSerializerSnapshot class as their snapshot would complicate the implementation for the TypeSerializerSnapshot#resolveSchemaCompatibility and TypeSerializerSnapshot#restoreSerializer() method.

This would also be a bad separation of concerns; a single serializer’s serialization schema, configuration, as well as how to restore it, should be consolidated in its own dedicated TypeSerializerSnapshot class.

3. Avoid using Java serialization for serializer snapshot content #

当写入持久化序列化器快照的内容时,根本不应该使用Java序列化。例如,一个序列化器需要将其目标类型的一个类作为快照的一部分持久化。关于类的信息应该通过写入类名来持久化,而不是直接使用Java序列化类。读取快照时,会读取类名,并通过类名动态加载类。

这种做法可以确保始终能够安全地读取序列化器快照。在上面的例子中,如果类型类是使用Java序列化持久化的,那么一旦类实现发生了变化,并且根据Java序列化细节不再是二进制兼容的,那么快照就不再是可读的了。

Migrating from deprecated serializer snapshot APIs before Flink 1.7 #

This section is a guide for API migration from serializers and serializer snapshots that existed before Flink 1.7.

Before Flink 1.7, serializer snapshots were implemented as a TypeSerializerConfigSnapshot (which is now deprecated, and will eventually be removed in the future to be fully replaced by the new TypeSerializerSnapshot interface). Moreover, the responsibility of serializer schema compatibility checks lived within the TypeSerializer, implemented in the TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot) method.

Another major difference between the new and old abstractions is that the deprecated TypeSerializerConfigSnapshot did not have the capability of instantiating the previous serializer. Therefore, in the case where your serializer still returns a subclass of TypeSerializerConfigSnapshot as its snapshot, the serializer instance itself will always be written to savepoints using Java serialization so that the previous serializer may be available at restore time. This is very undesirable, since whether or not restoring the job will be successful is susceptible to availability of the previous serializer’s class, or in general, whether or not the serializer instance can be read back at restore time using Java serialization. This means that you be limited to the same serializer for your state, and could be problematic once you want to upgrade serializer classes or perform schema migration.

To be future-proof and have flexibility to migrate your state serializers and schema, it is highly recommended to migrate from the old abstractions. The steps to do this is as follows:

  1. Implement a new subclass of TypeSerializerSnapshot. This will be the new snapshot for your serializer.
  2. Return the new TypeSerializerSnapshot as the serializer snapshot for your serializer in the TypeSerializer#snapshotConfiguration() method.
  3. Restore the job from the savepoint that existed before Flink 1.7, and then take a savepoint again. Note that at this step, the old TypeSerializerConfigSnapshot of the serializer must still exist in the classpath, and the implementation for the TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot) method must not be removed. The purpose of this process is to replace the TypeSerializerConfigSnapshot written in old savepoints with the newly implemented TypeSerializerSnapshot for the serializer.
  4. Once you have a savepoint taken with Flink 1.7, the savepoint will contain TypeSerializerSnapshot as the state serializer snapshot, and the serializer instance will no longer be written in the savepoint. At this point, it is now safe to remove all implementations of the old abstraction (remove the old TypeSerializerConfigSnapshot implementation as will as the TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot) from the serializer).
展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部