flink执行配置

原创
2022/02/13 23:59
阅读数 731

StreamExecutionEnvironment包含ExecutionConfig,它允许为运行时设置特定于作业的配置值。若要更改影响所有作业的默认值,请参见Configuration。

Java
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ExecutionConfig executionConfig = env.getConfig();

setClosureCleanerLevel()。闭包清洁器级别设置为ClosureCleanerLevel。默认情况下递归。闭包清理程序会删除对Flink程序中匿名函数类的不需要的引用。禁用闭包清理器后,可能会发生匿名用户函数引用周围类的情况,这通常是不可序列化的。这将导致序列化器异常。设置如下:NONE:完全禁用闭包清理器,TOP_LEVEL:只清理顶级类而不递归到字段中,RECURSIVE:递归地清理所有字段。

getParallelism() / setParallelism(int parallelism)设置任务的默认并行度。

getMaxParallelism() / setMaxParallelism(int parallelism)为任务设置默认的最大并行度。此设置确定最大并行度,并指定动态扩展的上限。

getNumberOfExecutionRetries() / setNumberOfExecutionRetries(int numberOfExecutionRetries)设置失败的任务重新执行的次数。值为零将有效地禁用容错功能。-1表示使用系统默认值(如配置中定义的)。这是不赞成的,使用重启策略代替。

getExecutionRetryDelay() / setExecutionRetryDelay(long executionRetryDelay)设置一个任务失败后系统重新执行之前的延迟时间(以毫秒为单位)。当taskmanager上的所有任务都成功停止后,延迟就开始了,一旦延迟过去,任务就会重新启动。这个参数对于延迟重新执行非常有用,以便让某些超时相关的失败完全浮出表面(比如没有完全超时的断开连接),然后再尝试重新执行,然后由于相同的问题再次失败。此参数仅在执行重试次数为一个或多个时有效。这是不赞成的,使用重启策略代替。

getExecutionMode () / setExecutionMode()。默认的执行模式是pipelining。设置执行模式以执行程序。执行模式定义了数据交换是以批处理方式还是以流水线方式执行。

/ disableForceKryo enableForceKryo()。默认情况下,Kryo不是强制的。强制GenericTypeInformation为POJO使用Kryo序列化器,即使我们可以将它们作为POJO来分析。在某些情况下,这可能更可取。例如,当Flink的内部序列化器不能正确处理POJO时。

enableForceAvro () / disableForceAvro()。Avro在默认情况下是不强制的。强制Flink AvroTypeInfo使用Avro序列化器而不是Kryo来序列化Avro pojo。

enableObjectReuse() / disableObjectReuse()缺省情况下,Flink中不重用对象。启用对象重用模式将指示运行时重用用户对象以获得更好的性能。请记住,当操作的用户代码函数不知道这种行为时,这可能会导致错误。

该方法允许用户设置自定义对象作为作业的全局配置。因为在所有用户定义的函数中都可以访问ExecutionConfig,所以这是使配置在作业中全局可用的一种简单方法。

addDefaultKryoSerializer(类< ?>类型,序列化器< ?为给定类型注册一个Kryo序列化器实例。

addDefaultKryoSerializer(类< ?>类型,类< ?扩展了序列化器< ?为给定类型注册一个Kryo序列化器类。

registerTypeWithKryoSerializer(类< ?>类型,序列化器< ?向Kryo注册给定的类型,并为它指定一个序列化器。通过向Kryo注册类型,类型的序列化将更加有效。

registerKryoType(类< ?如果该类型最终被Kryo序列化,那么它将在Kryo注册,以确保只写入标签(整数id)。如果类型没有向Kryo注册,那么它的整个类名将与每个实例一起序列化,从而导致更高的I/O成本。

registerPojoType(类< ?向序列化堆栈注册给定的类型。如果类型最终被序列化为POJO,那么该类型将被注册到POJO序列化器中。如果类型最终被Kryo序列化,那么它将被注册在Kryo,以确保只写入标签。如果类型没有向Kryo注册,那么它的整个类名将与每个实例一起序列化,从而导致更高的I/O成本。

请注意,在registerKryoType()中注册的类型对于Flink的POJO序列化器实例是不可用的。

disableautotyperregistration()默认启用类型自动注册。自动类型注册是在Kryo和POJO序列化器中注册用户代码使用的所有类型(包括子类型)。

setTaskCancellationInterval(long interval)设置连续尝试取消正在运行的任务的间隔时间(毫秒)。当一个任务被取消时,会创建一个新线程,如果该任务线程在一段时间内没有终止,则会周期性地调用该任务线程上的interrupt()。该参数表示连续调用interrupt()的时间间隔,默认设置为30000毫秒,即30秒。

在Rich*函数中,可以通过getRuntimeContext()方法访问RuntimeContext,它还允许访问所有用户定义函数中的ExecutionConfig。

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部