flink细粒度资源管理

原创
2022/02/14 01:24
阅读数 138

Apache Flink努力为所有应用程序自动派生合理的默认资源需求。对于希望根据特定场景的知识调整资源消耗的用户,Flink提供了细粒度的资源管理。

这个页面描述了细粒度资源管理的使用、适用场景以及它是如何工作的。

注意:此特性目前是MVP(“最小可行产品”)特性,仅对DataStream API可用。
适用场景#
可能受益于细粒度资源管理的典型场景如下:

任务具有明显不同的并行性。

整个管道所需的资源太多,一个插槽/任务管理器容纳不下。

批处理任务,不同阶段的任务所需的资源明显不同

在如何提高资源效率中,将深入讨论为什么细粒度资源管理可以提高上述场景的资源效率。

它是如何工作的#
正如在Flink架构中所描述的,TaskManager中的任务执行资源被分割成许多槽。插槽是Flink运行时资源调度和资源需求的基本单位。

通过细粒度的资源管理,槽请求包含特定的资源概要,用户可以指定这些概要。Flink将尊重这些用户指定的资源需求,并动态地从TaskManager的可用资源中切割出一个精确匹配的槽。如上所示,需要一个0.25 Core、1GB内存的插槽,Flink为它分配了插槽1。

以前在Flink中,资源需求只包含所需的槽,没有细粒度的资源配置文件,即粗粒度的资源管理。TaskManager有固定数量的相同槽位来满足这些要求。
对于没有指定资源配置文件的资源需求,Flink将自动确定一个资源配置文件。目前,它的资源配置文件是根据TaskManager的总资源和任务管理器计算出来的。numberOfTaskSlots,就像在粗粒度资源管理中一样。如上所示,TaskManager的总资源为1 Core,内存为4gb,任务槽位数为2,在没有指定资源配置文件的情况下,创建槽位2,使用0.5 Core,内存为2gb。

分配完Slot 1和Slot 2后,TaskManager还剩下0.25 Core和1gb内存作为空闲资源。这些空闲资源可以进一步分区,以满足以下资源需求。

详情请参阅资源分配策略。

使用#
要使用细粒度的资源管理,您需要:

通过配置启用细粒度资源管理。

指定资源需求。

启用细粒度资源管理#
为了实现细粒度资源管理,需要配置cluster.fine-grain -resource-management。启用为true。

如果没有这个配置,Flink运行时就不能根据指定的资源需求调度槽,作业将会异常失败。
为槽位共享组#指定资源需求
细粒度资源需求是在槽共享组上定义的。插槽共享组是一个提示,它告诉JobManager中的操作符/任务可以放在相同的插槽中。

要指定资源需求,你需要:

定义槽共享组及其包含的操作符。

指定槽位共享组的资源。

有两种方法定义槽共享组及其包含的操作符:

只能通过插槽共享组的名称定义插槽共享组,并通过slotSharingGroup(String name)将其附加到操作符。

您可以构造一个SlotSharingGroup实例,该实例包含槽位共享组的名称和可选的资源配置文件。SlotSharingGroup可以通过SlotSharingGroup (ssg)绑定给运营商。

您可以为插槽共享组指定资源配置文件:

如果通过slotSharingGroup(slotSharingGroup ssg)设置槽位共享组,则可以在构造slotSharingGroup实例时指定资源配置文件。

如果只使用slotSharingGroup(String name)设置槽位共享组名。您可以构造具有相同名称的SlotSharingGroup实例以及资源配置文件,并使用StreamExecutionEnvironment#registerSlotSharingGroup(SlotSharingGroup ssg)注册它们的资源。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

SlotSharingGroup ssgA = SlotSharingGroup.newBuilder("a")
  .setCpuCores(1.0)
  .setTaskHeapMemoryMB(100)
  .build();

SlotSharingGroup ssgB = SlotSharingGroup.newBuilder("b")
  .setCpuCores(0.5)
  .setTaskHeapMemoryMB(100)
  .build();

someStream.filter(...).slotSharingGroup("a") // Set the slot sharing group with name “a”
.map(...).slotSharingGroup(ssgB); // Directly set the slot sharing group with name and resource.

env.registerSlotSharingGroup(ssgA); // Then register the resource of group “a”

在构造槽位共享组时,可以为槽位共享组设置如下资源组件:

CPU核心。定义需要多少CPU核。需要显式地配置为正值。
任务堆内存。定义需要多少任务堆内存。需要显式地配置为正值。
任务堆外内存。定义需要多少任务堆外内存,可以为0。
内存管理。定义需要多少任务管理内存,可以为0。
外部资源。定义所需的外部资源,可以为空。

// Directly build a slot sharing group with specific resource
SlotSharingGroup ssgWithResource =
    SlotSharingGroup.newBuilder("ssg")
        .setCpuCores(1.0) // required
        .setTaskHeapMemoryMB(100) // required
        .setTaskOffHeapMemoryMB(50)
        .setManagedMemory(MemorySize.ofMebiBytes(200))
        .setExternalResource("gpu", 1.0)
        .build();

// Build a slot sharing group without specific resource and then register the resource of it in StreamExecutionEnvironment
SlotSharingGroup ssgWithName = SlotSharingGroup.newBuilder("ssg").build();
env.registerSlotSharingGroup(ssgWithResource);

Limitations #

由于细粒度资源管理是一个新的、实验性的特性,并不是默认调度器支持的所有特性都可以使用它。Flink社区正在致力于解决这些限制。

不支持弹性缩放。弹性伸缩目前只支持没有指定资源的槽位请求。

不支持任务管理器冗余。slotmanager。redundancy -taskmanager-num用于启动冗余taskmanager,以加快任务恢复速度。此配置选项暂时不会在细粒度资源管理中生效。

不支持均匀分布槽策略。这种策略试图在所有可用的任务管理器中均匀地分配槽位。在第一个版本的细粒度资源管理和集群中不支持该策略。均匀展开的插槽暂时不会在它中生效。

与Flink Web UI的有限集成。细粒度资源管理中的槽可以具有不同的资源规范。web UI目前只显示槽位号,不显示槽位号的详细信息。

与批处理作业的有限集成。目前,细粒度资源管理需要执行批处理工作负载,所有边缘的类型都是BLOCKING。为此,您需要配置细粒度的。shuffle-mode。all-blocking为true。注意,这可能会影响性能。详情请参阅FLINK-20865。

不建议使用混合资源。不建议仅为作业的某些部分指定资源需求,其余部分不指定需求。目前,未指定的需求可以用任何资源的槽来满足。它所获得的实际资源可能在不同的作业执行或故障转移之间不一致。

插槽分配结果可能不是最优的。由于槽位需求包含多个维度的资源,因此槽位分配实际上是一个多维的装箱问题,具有NP-hard。默认的资源分配策略可能无法实现最优的槽位分配,在某些场景下可能导致资源分片或资源分配失败。

注意#
设置槽位共享组可能会影响性能。将可链操作符设置为不同的插槽共享组可能会打破操作符链,从而影响性能。

插槽共享组不会限制运营商的调度。插槽共享组仅提示调度器可以将分组操作符部署到共享插槽中。无法保证调度器总是将分组操作符部署在一起。在分组操作符被部署到单独的槽的情况下,槽资源将从指定的组需求派生出来。

它是如何提高资源效率的
在本节中,我们将深入探讨细粒度资源管理如何提高资源效率,这可以帮助您理解它是否有利于您的工作。

以前,Flink采用了一种粗粒度的资源管理方法,在这种方法中,任务被部署到预定义的、通常相同的槽中,而不知道每个槽包含多少资源。对于许多作业来说,使用粗粒度的资源管理并简单地将所有任务放到一个槽共享组中就可以很好地利用资源。

对于许多所有任务都具有相同并行性的流作业,每个槽将包含一个完整的管道。理想情况下,所有管道应该使用大致相同的资源,这可以通过调整相同槽的资源轻松满足。

任务的资源消耗随时间变化。当一个任务的消耗减少时,额外的资源可以被另一个任务使用,而这个任务的消耗正在增加。这被称为削峰填谷效应,减少了所需的总资源。

但是,在某些情况下,粗粒度资源管理不能很好地工作。

任务可能有不同的并行性。有时,这种不同的并行性是无法避免的。例如,源/汇/查找任务的并行性可能会受到外部上游/下游系统的分区和IO负载的限制。在这种情况下,任务较少的插槽需要的资源将比那些拥有整个任务管道的插槽少。

有时候,整个管道所需的资源可能太多,无法放入单个槽/TaskManager中。在这种情况下,需要将管道拆分为多个ssg,这些ssg可能并不总是具有相同的资源需求。

对于批处理作业,不能同时执行所有任务。因此,管道的瞬时资源需求随时间而变化。

试图用相同的槽执行所有任务可能会导致资源利用率不佳。相同槽位的资源必须能够满足最高的资源需求,这将浪费在其他需求上。当涉及昂贵的外部资源(如GPU)时,这种浪费将变得更加难以承受。细粒度资源管理利用不同资源的槽位来提高此类场景下的资源利用率。

资源分配策略#
在本节中,我们将讨论Flink运行时的槽分区机制和资源分配策略,包括Flink运行时如何选择TaskManager来切割槽,以及如何在Native Kubernetes和YARN上分配TaskManager。请注意,资源分配策略在Flink运行时是可插入的,这里我们在细粒度资源管理的第一步中介绍了它的默认实现。在未来,用户可能会为不同的场景选择不同的策略。

正如它的工作原理一节所描述的,Flink将从TaskManager中为带有指定资源的槽请求切出一个完全匹配的槽。内部流程如图所示。TaskManager将以总资源启动,但没有预定义的槽位。当一个0.25 Core和1GB内存的插槽请求到达时,Flink将选择一个具有足够空闲资源的TaskManager,并使用请求的资源创建一个新的插槽。如果一个槽被释放,它将返回它的资源给TaskManager的可用资源。

在当前的资源分配策略中,Flink将遍历所有已注册的taskmanager,并选择第一个拥有足够空闲资源来满足槽请求的taskmanager。当没有足够空闲资源的TaskManager时,Flink会在Native Kubernetes或YARN上部署新的TaskManager。在当前的策略中,Flink将根据用户的配置分配相同的任务管理器。由于taskmanager的资源规范是预定义的:

集群中可能存在资源碎片。例如,如果有两个插槽请求,堆内存为3gb,而TaskManager的总堆内存为4gb,那么Flink将启动两个TaskManager,每个TaskManager将浪费1gb的堆内存。在未来,可能会有一种资源分配策略,可以根据作业的槽位请求分配异构的任务管理器,从而减轻资源片段的影响。

您需要确保为槽位共享组配置的资源组件不大于TaskManager的总资源。否则,您的工作将异常失败。

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部