Ray -分布式计算框架架构设计详解 v2

我是 Andy.Qin,一个想创造哆啦 A 梦的 Maker,这次带来的是分布式计算框架 Ray 在 2022 年 8 月份才发布的 v2 版本的架构设计的中文详解,翻译自他们的架构设计文档英文版,可能这是第一篇目前关于 Ray 中文资料最详细的博文了。如果在翻译上有问题或者有任何问题欢迎发送邮件到 [email protected]

综述

Ray 是一个为了给分布式提供通用的 API 发明出来的分布式计算框架,希望通过简单但通用的抽象编程方式,让系统自动完成所有的工作。Ray 的设计者基于这个理念让 Ray 可以跟 Python 紧密相连,能够通过很少的代码就能处理业务,而其它的并行、分布式内存管理等问题都不用担心,Ray 会根据这些资源的情况自动调度和缩放。

其次 Ray 希望能够对应用程序的一些系统级行为进行控制,如系统的环境变量、参数、故障处理等等。

Ray使 用了包括分布式引用计数和分布式内存等组件,这些组件增加了体系结构的复杂性,但对性能和可靠性来说是必需的。

Ray 构建在 gRPC 的基础上,并且在许多情况下与 gRPC 直接调用的性能一致。与单独使用 gRPC 相比,Ray 使应用程序更容易利用并行和分布式执行、分布式内存共享(通过共享内存对象存储)和动态创建轻量级服务(如通过 gRPC  调用 Actor)。

为了提高可靠性,Ray 的内部协议旨在确保故障期间的正确性只会增加非常低的性能开销。Ray 实现了分布式引用计数协议,以确保内存安全帮助从故障中恢复。

由于 Ray 用户考虑用资源而不是机器来表示他们的计算资源,因此 Ray 应用程序可以简单地从笔记本电脑扩展到集群,而无需任何代码更改。Ray 的分布式调度器和对象管理器旨在实现这种无缝扩展,且开销低。

核心相关系统

Cluster Orchestrators

Ray 提供了更简单运行在 Kubernetes 的方法,可以使用 KubeRay Operator ,它提供了一种在 k8s 中管理 Ray 集群的解决方案,每个 Ray 集群由一个 head 节点和一群 woker 节点组成。你可以通过 KubeRay 来根据所需调整集群大小,同时也支持 GPU 之类的异构计算,也支持有多个版本的 Ray 的集群。

Parallelization Frameworks

与 multiprocessing 或者 Celery 之类的框架相比,Ray 提供了更通用且性能更高的 API。同时支持内存共享。

Data Processing Frameworks

与 Spark、Flink 等框架相比,Ray 提供的 API 更加底层和灵活,更适合作为 “distributed glue” 框架。另一方面 Ray 没有限定一定是数据处理的模式,而是通过功能库的方式提供不同的处理模式。

Actor Frameworks

不像 Erlang 和 Akka 之类的框架,Ray 支持跨语言的操作和使用那个语言原生的库,能够透明地管理无状态的并行计算,显式地支持 Actor 之间共享内存。

HPC Systems

许多 HPC 系统公开了更低级的消息传递接口,虽然很灵活,但开发人员需要付出更多时间和成本。Ray 上的应用程序可以通过初始化 Ray 的 Actor 组之间的通信群来利用这些优化后的 communication primitives。(类似 allreduce)

2.0 带来的新特性

  • 原本的 Global Control Store 改名叫 Global Control Service,简称 GCS,有着全新的设计更加简单和可靠。
  • 分布式调度器(包括调度策略和置放群组)能让你更方便地扩展功能。
  • 在可靠性和容错性方面进行改进,包括从故障节点中恢复 object reconstruction 和 GCS 的容错机制。
  • 增加了像 KubeRay 等方便集群管理的一些工具。

架构设计概述

主要概念解释

Task

一个用于远程调用的函数,在不同的调用方的进程上执行,也可以是在不同的机器上执行。Task 可以是无状态的也可以是有状态的(如 Actor)。

Object

应用所需的值,这些是任务返回的或者通过 ray.put创建的值,这些对象是一旦创建就不可以修改的。可以通过 ObjectRef 引用。以下也可能被称为对象。

Actor

有状态的工作进程,Actor 的任务必须使用特定的方式提交给指定的实例,可以在执行过程中修改 Actor 的内部状态。你可以理解为是一个常驻进程,或者是有状态的 Task。

Driver

程序的 root 或者是主程序,一般指放 ray.init 的代码的应用。

Job

来自同个 Driver 的 Task 和 Actor 的集合,Driver 和 Job 是 1:1 映射关系。是一个逻辑上的概念,其含义为运行一次用户侧代码所所涉及到的所有生成的 Task 以及产生的状态的集合。

设计

协议概览(大部分通过 gRPC):
a. 任务执行,对象引用计数。
b. 本地资源管理。
c. 远程/分布式资源管理。
d. 分布式对象传输。
e. 大型对象的存储和检索。检索是通过 ray.get 或在任务执行过程中进行。或者用对象的值替换一个任务的ObjectID参数时。
f. 调度器从远程节点获取对象,以满足本地排队任务的依赖满足。

组件

Ray 集群是由一个或者多个 worker 节点组成,每个 worker 节点由以下物理进程组成:

  • 一个或多个的 worker 进程,负责任务的提交和执行,worker 进程要么是无状态的,要么是一个 actor。初始工作线程由机器的 CPU 数量决定。每个工作节点会存储:
  • 一个 ownership 表,worker 引用的对象的系统元数据,例如引用技术和对象位置。
  • 进程内存储,存放一些小对象。
  • raylet,用于管理每个节点上的共享资源,与工作进程不同的是,raylet 是在所有 worker  中共享的:
  • Scheduler,负责资源管理、任务放置和完成将 Task 的参数存储在分布式的 Object Store 中。
  • Object Store,一个共享内存存储,也被称为 Plasma Object Store。负责存储、转移和溢出(spilling,如果 Object Store 满了会移动到外部存储)大型对象。集群中各个 Object Store 共同构建了 Ray 的分布式对象存储。

每一个工作进程和 raylet 都被分配了一个唯一的 28-byte 的标识符和一个 ip 地址、端口。

同样的地址和端口在工作进程死亡后重新恢复时可以重复使用,但是唯一 ID 不会。工作进程和 raylet 是 fate-share 的,一个出故障另外一个就无法使用了。

其中有个节点会被指定为 Head 节点,除了有上述进程外还会托管 GCS 和 Driver。在新版本的 GCS 是一个管理集群的元数据的服务器,比如 actor 的位置、worker 存储的 key-value 对等。GCS 还管理少量的集群几笔的操作包括调度预占用组和 actor 以及确定集群中哪些是成员。一般来说 GCS 中保存的数据很少被调用,但是可以被集群中几乎所有的 worker 节点使用。GCS 容错机制是在 v2 版本中加入的,它可以运行在任何节点或者多个节点,之前只能在指定的节点。

Dirver 是一个用于指定的用于运行最上级的代码的应用的节点,它能提交任务但是并不能在自己上面执行。虽然 Driver 可以在任何节点上运行,但默认情况下只在 Head 节点运行。

Head 节点还包含了其它类似集群级别服务的自动缩放、任务提交等等。

Ownership

大多数的系统是通过一种叫做 Ownership 的分散控制的方式管理的,这个方式是指每一个 ObjectRef 都是由所在的 worker 进程管理的,该 wo rker 或者也被叫做 owner 需要确保 Task 的执行、创建 value。

一般有两种方式去创建 ObjectRef,在下面两个例子中,owner 都是实际运行的 worker 的进程。

  • x_ref = f.remote()
  • x_ref = ray.put()

换句话来说 owner 是生成和初始化  ObjectRef 的 worker,如果 ObjectRef 由 Task 返回,那么这个值是由远程 worker 创建的而不是拿到返回值的 worker。

在 2.0 版本中,这个方式带来了更好的性能和更简单的结构、提升了可靠性,每个 application 是相对独立的,一个远程调用故障了并不会影响另一个。

但 ownership 还是存在一些问题,像如果要解析 ObjectRef ,就必须能够访问对象的 owner,这意味着 object 和 owner 是 fate-share(一个挂掉,另一个一起挂掉)。其次是目前无法转移所有权。

内存模型

Ray 通过以下方式使用内存:

  1. Ray 的 worker 在执行任务或者运行 Actor 时会使用堆内存,由于 Ray 的 Task 和 Actor 一般是并行运行,开发人员应该关注每个 Task 的堆内存的情况。如果内存压力过大,Ray 会自动释放掉消耗内存大的进程。
  2. 当一个 worker 调用 ray.put() 或者从一个 Task 返回时,它会将提供的值复制到 Ray 的共享内存对象存储中。然后 Ray 会让这些对象在整个集群中可访问,在发生故障时尝试恢复它们,如果对象存储超过其配置的容量,则将它们转移其它存储设备,并在所有 ObjectRef超出范围时将它们垃圾回收。对于可以被 zero-copy 的反序列化的值,会在取出时将指向共享缓冲区的指针给 worker,其它则是被反序列化到接收的 worker 的堆内存中。
  3. 如果 Object 足够小(默认 100 kb),Ray 将直接把值存储在 owner 的内存中,而不是在 Raylet 里的共享对象存储。任何其它使用这个对象的 worker 都会把值直接复制到自己的内存里。同样 Ray 也会对他们进行垃圾回收。
  4. Ray 的元数据也会使用堆内存,大部分元数据都很小,可能就几 kb。例如:
  • GCS 的所有 Actor、所有节点、所有的预占用组集群。
  • Raylet 的本地排队的 Task、这些任务的对象参数、对象。
  • Worker 的提交了等待处理的任务或者可能需要重新通过  lineage reconstruction 执行的。拥有的对象等等。

语言运行时

所有 Ray 核心组件都在 C++ 中实现。Ray 通过一个称为“core worker”的 C++ 库支持Python、Java 和 C++ 前端。该库实现了所有权表、进程内存储,并管理与其他工作程序和 raylet 的 gRPC 通信。

Task 的生命周期

所有者需要能够指定被提交来的任务并且将 ObjectRef 解析成一个普通的值。Driver 去 Raylet 中请求需要的值,将值和 Task A 都交给 Worker 1 运行,Driver 拥有 Task A 结果的所有权,而 Worker 1 有 TaskB 的所有权。

所有者可以将普通的 Python 对象作为任务参数传递,如果参数传递的值很小,会直接将这个值从所有者的内存中复制到 Task 中,让执行者可以直接引用。如果传递的参数很大,所有者会先通过 ray.put 放入共享对象存储,然后将  ObjectRef 作为参数传递。

Ray 会在每一次自动进行上面的流程,如果你喜欢两个 Task 共用一个请显式调用 put。

所有者也可以直接将其它的  ObjectRef 作为任务参数传递,如果 ObjectRef 对应的值很小,会直接放到 Task 的 specification 中,否则传递  ObjectRef 。任务执行时会将  ObjectRef  解析成具体的值。

一旦所有任务依赖项就绪,所有者就从分布式调度器请求资源来执行任务。分布式调度器尝试获取资源,并通过分布式内存将 Task 的 specification 中的任何   ObjectRef  参数获取到本地节点。一旦资源和参数都可用,调度程序就会批准请求。

所有者通过 gRPC 将 Task 的 specification 发送给 worker 来调度。执行 Task 后 worker 必须存储返回的值。如果返回的值很小会直接返回给所有者,如果很大会存到共享内存存储将  ObjectRef 返回,允许所有者引用返回值而不需要先拿到本地节点。

当 Ray 的 Task 第一次被调用时,它会被存储到 GCS 中,稍后会由被租用的 worker 获取出函数的定义进行运行。

Task 可能在运行过程中可能会出现应用级错误(worker 进程仍然是活跃的状态)或者是系统级错误(worker 进程已经死亡或者故障)中断抛出。

默认情况下,由于应用程序级别错误而失败的任务不会自动重试。异常被捕获并存储为任务的返回值。在 2.0 中,用户可以传递应用程序级异常的白名单,Ray 可以自动重试。由于系统级错误而失败的任务会自动重试,你可以指定最多重试次数。

Object 的生命周期

对象是一个不可变的值,可以从 Ray 集群中的任何位置存储和引用。对象的所有者是通过提交创建任务或调用 ray.put 创建初始化 ObjectRef 的 worker。所有者负责管理对象的生存期。Ray 保证如果所有者活着,对象最终可能会被解析为其值(或者在工作程序失败的情况下抛出错误)。如果所有者已死亡,尝试获取对象的值将引发异常,即使仍然存在对象的物理副本。

每个工作程序存储其拥有的对象的引用计数。仅在以下操作期间计算引用:

  • 向任务传递 ObjectRef 或包含 ObjectRef 作为参数的对象。
  • 从任务中返回 ObjectRef 或包含 ObjectRef 的对象。

对象可以存储在所有者的进程内存存储或分布式对象存储中。进程内内存存储是在所有者的堆上分配的,不强制限制存储量。因为 Ray 只存储很小的对象。过多的小对象存储在内存中可能会引起内存不足的问题而导致进程被结束。存储在分布式对象存储的对象首先会存储在共享内存存储中,共享内存存储默认是机器内存的 30%,在达到上限后转移到本地磁盘上。

你可以通过 ray.getObjectRef 转为实际的值或者是将 ObjectRef 作为参数传递,具体的执行者会自动解析。

如果出现系统级的故障,对象存储在分布式内存存储中,并且该对象的所有副本都因 raylet 故障而丢失,则该对象就丢失了。Ray 会尝试通过重建的方式去恢复这个对象,如果所有者进程也死亡了,则无法重建。

Actor 的生命周期

当在 Python 中创建 Actor 时,将构建一个特殊任务,称为 Actor 创建任务,该任务运行 Actor 的 Python 构造函数。创建的 worker 等待创建任务的所有依赖项就绪,类似于普通任务。一旦完成, worker 将向 GCS 异步注册参与者。GCS 通过调度 Acotr 创建任务来创建 Actor。这与普通任务的调度类似,只是在 Actor 进程的生命周期内获取其指定的资源。

同时,创建 actor 的 Python 调用立即返回一个 “actor handle” ,即使尚未安排 actor 创建任务,也可以使用该句柄。在参与者创建任务完成之前,不会调度将 actor handle 相关的任务。有关详细信息,请参见 Actor 创作。

Actor 的执行与正常任务执行类似,主要有两个区别:

  • 默认情况下 Actor 已经不需要从调度器中获取资源了,当被创建时就已经获取了。
  • 对于 Actor 的每个调用方,任务的执行顺序与提交顺序相同。

当 Actor 的创建者退出时且集群中没有其它还没结束的 actor handle 时会自动被清理。当然你也可以显式清理。

在某些情况下,可能不需要顺序执行。为了支持这样的场景,Ray 还提供了一个选项,通过它可以使用事件循环并发运行任务,或者使用线程并行运行多个任务。从调用者的角度来看,向这些参与者提交任务与向常规参与者提交任务相同。唯一的区别是,当任务在参与者上运行时,它被发布到后台线程或线程池,而不是直接在主线程上运行。Ray API(如任务提交和 Ray.get)是线程安全的,但用户需要负责其它部分的线程安全。

故障模型

系统模型

Ray 集群中的任意一个非 Head 的节点的丢失是不影响集群的,Head 节点托管了 GCS,但在 2.0 中,允许 GCS 重启到其它节点来减少对集群的干扰。

所有节点都被分配了一个唯一的标识符,并通过心跳相互通信。GCS 负责决定集群的成员资格,即哪些节点当前处于活动状态。GCS 将删除任何超时的节点 ID,这意味着必须使用不同的节点 ID 在该节点上启动新的 raylet,以便重新使用物理资源。如果一条仍然活跃的的 Ray 收到它已经超时,它就会退出。节点的故障检测当前不处理网络分区问题:如果从 GCS 分区了工作节点,它将超时并标记为不可用。

每个 raylet 向 GCS 报告任何本地的 worker 进程的死亡情况。GCS 会广播这些故障事件,并使用它们让位置在故障节点的已注册的 Actor 死亡。所有 worker 进程的命运都与其节点上的 raylet 共享。

raylets 负责防止 worker 工作进程失败后集群资源和系统状态发生泄漏问题。对于失败的 worker 进程(本地或远程),每个 raylet 负责:

  • 通过杀死任何故障的 worker 进程来释放任务执行所需的集群资源,如 CPU。故障的worker 发出的任何未完成的资源请求也将被取消。
  • 释放该 worker 持有的在分布式对象内存存储的对象。

应用程序模型

系统故障模型意味着 Ray 里面的任务和对象将与所有者共享命运。例如,如果在这种情况下运行“a”的 worker 失败,那么将收集在其子树(图中灰色的 “b” 和 “z”)中创建的所有对象和任务。如果 “b” 是在 “a” 的子树中创建的 actor(参见 Actor 死亡),则同样适用。

  • 如果尝试获取这个发生了错误的对象的值,任何活动进程会收到应用级异常。如上图, Driver 会在 get 结果时收到异常。
  • 你可以通过让不同的 Task 放到不同的子树(调用嵌套的函数),故障可以彼此隔离。
  • 应用程序与 Driver 是命运共享的,如果 Driver 挂了整个执行过程都会故障。

如果希望避免命运共享(fate-share),可以将它变为独立的 Actor 就不会受到 Driver 证明周期的影响,变为 Actor 后只能通过显式调用方法来销毁它。

1.3 开始 Ray 可以通过将对象放到其它存储空间来实现持久化,2.0 开始 Ray 默认会为普通 Task 启用对象重建。

Object 管理

通常,小的对象存储在所有者的进程内存储中,而大的对象存储在分布式对象存储中。为了减少每个对象的内存占用和解析时间。注意,在后一种情况下,占位对象(placeholder object)存储在进程内存储中,以表明该对象实际上存储在分布式对象存储中。

进程内存储中的对象可以通过内存复制快速解析,但由于额外的复制,当许多进程引用时,可能会占用更大的内存。单个工作程序的进程内存储的容量也受限于该机器的内存容量,从而限制了在任何给定时间可以引用的此类对象的总数。对于多次引用的对象,吞吐量也可能受到所有者进程处理能力的限制。

相比之下,解析分布式对象存储中的对象需要至少一个从工作程序到工作程序的本地共享内存存储的 RPC。如果工作进程的本地共享内存存储尚未包含对象的副本,则可能需要其它 RPC 连接。另一方面,因为共享内存存储是用共享内存实现的,所以同一节点上的多个工作人员可以引用对象的同一副本。如果对象可以,这可以减少总内存占用。分布式内存的使用还允许进程引用对象,而不需要对象本地,这意味着 zero-copy 反序列化可以使用超过单个机器的内存总容量限制。吞吐量可以随分布式对象存储中的节点数量而变化,因为对象的多个副本可能存储在不同的节点上。

Object 转换(Object resolution)

Object resolution 是将 ObjectRef 转换为普通值的过程,例如在使用 get 或者作为任务参数传递时会自动转换。

ObjectRef  包含两个字段:

通过直接从所有者的进程内的存储复制小对象来解析小对象。例如,如果所有者调用“ray.get”,系统将查找并反序列化本地进程内存储中的值。如果所有者提交了一个含有依赖任务,它将通过将值直接复制到 task specification 中来内联对象。类似地,如果借用者试图解析值,则对象值将直接从所有者处复制。

如果是个大对象会经过上面图中的流程进行解析。

对象 x 是在 Node 2 创建的,当 Owner 调用这个对象时会先查找对象的位置,并发出副本请求从 Node 2 中复制,然后 Node 1 接收这个对象。

大型对象存储在分布式对象存储中,必须使用分布式协议进行解析。如果对象已经存储在引用持有者的本地共享内存存储中,则引用持有者可以通过 IPC 检索对象。这将返回一个指向共享内存的指针,该内存可能同时被同一节点上的其它 worker 引用。

如果该对象在本地共享内存存储中不可用,则引用持有者会通知其本地 raylet,然后它会尝试从远程 raylet 获取副本。raylet 从对象目录中查找位置,并从这些 raylet 之一请求传输。自 Ray v1.3+ 起,对象目录存储在所有者处(以前存储在GCS中)。

Memory management

对于远程任务,对象值由正在执行的工作程序计算。如果值很小,worker 将直接向所有者回复值,并将其复制到所有者的进程内存储中。一旦所有引用超出范围,此值将被删除。

主副本与可收回副本。主副本(节点2)不符合逐出资格。但是,节点1(通过“ray.get”创建)和节点3(通过任务提交创建)上的副本可以在内存不够时被逐出。

如果该值较大,则执行工作程序将该值存储在其本地共享内存存储中。共享内存对象的初始副本称为主副本。主副本是唯一的。

因为只要范围中有引用,它就不会被释放。raylet 通过保存对存储对象的物理共享内存缓冲区的引用来“锁定”主副本,从而防止对象存储区将其逐出。相反,如果在本地内存不够时,对象的其它副本可能会被 LRU 逐出,除非开发人员在使用这个对象。

在大多数情况下,主副本是要创建的对象的第一个副本。如果初始副本因故障而丢失,所有者将尝试根据对象的可用位置指定新的主副本。

一旦对象引用计数变为 0,对象的所有副本最终都会被自动垃圾收集。所有者会立即从进程中存储中删除小对象。raylets 将从分布式对象存储中异步删除大型对象。

raylets 还管理分布式对象传输,该传输基于对象当前需要的位置创建对象的其他副本,例如,如果依赖于对象的任务被调度到远程节点。


由于以下任何原因,对象可能存储在节点的共享内存对象存储中:

  • 它是由本地工作进程通过 “ray.get” 或 “ray.wait” 请求的。一旦工作进程完成“ray.get” 请求,就可以释放这些资源。注意,对于可以是 zero-copy,从 “ray.get” 返回的 Python 的值直接引用共享内存缓冲区,因此对象将被“固定”,直到该Python值超出范围。
  • 它由在该节点上执行的前一个任务返回。一旦没有对对象的更多引用,或者一旦对象被引用,这些对象就可以被释放。
  • 它是由该节点上的本地工作进程通过 “ray.put” 创建的,一旦不再引用对象(上图中节点1上的对象A、B 和 C),就可以释放这些对象。
  • 它是在该节点上排队或执行的任务的参数。一旦任务完成或不再排队,就可以释放这些资源。节点 2 上的对象 B 和 C 都是这样的例子,因为它们的下游任务 g 和 h 尚未完成。
  • 此节点以前需要它,例如,已完成的任务需要它。节点 2 上的对象 A 就是一个例子,因为 f 已经完成了执行。如果内存不足,这些对象可能会基于本地 LRU 被逐出。当ObjectRef 超出范围时,它们也会被快速释放(例如,在 f 完成并调用 “del A” 之后,A 从节点 2 中删除)。

内存不足的情况

对于小对象,Ray 当前不会对每个工作进程的进程存储施加内存限制。你需要确保小对象不会太多,导致所有者进程因内存不足而被终止。

Ray 对共享内存对象施加限制由 raylet 负责强制执行此限制。下面是可以存储在节点上的不同类型的共享内存对象的可视化,具有基本的优先级。

对象创建请求由 raylet 排队,并在(6)中有足够的内存用于创建对象时提供服务。如果需要更多内存,raylet将选择要从(3)-(5)中逐出的对象以腾出空间。即使在所有这些对象被逐出后,raylet 也可能没有空间用于新对象。如果应用程序所需的总内存大于集群的内存容量,就会发生这种情况。

如果驱逐后需要更多的空间,raylet 首先会在整个集群中的每个 worker 处触发特定于语言的垃圾收集。在语言前端看到的 ObjectRef 似乎很小,因此不太可能触发通常的特定语言垃圾回收机制。然而,ObjectRef 的实际内存占用可能非常大,因为物理值存储在 Ray 的对象存储中的其它位置,并且可能存储在与语言级别 ObjectRef 不同的节点上。因此,当任何 Ray 对象存储达到容量时,我们会在所有工作线程上触发语言级别的垃圾收集,这将清除所有不需要的 ObjectRef,并允许从对象存储中释放物理值。

raylet 会在触发溢出之前,让 worker 有时间异步垃圾收集ObjectRef。溢出允许从对象存储中释放(2)中的主副本,即使对象仍然可以被引用。如果禁用溢出,则应用程序将在可配置超时后接收 ObjectStoreFullError。溢出可能代价很高,并且增加任务执行的长时间;因此,一旦对象存储达到可配置阈值(默认为80%),Ray 也会急快速溢出对象,以确保可用空间。

注意,即使启用了对象溢出,对象存储仍可能耗尽内存。如果同时使用的对象太多(1),则会发生这种情况。为了减轻这种情况,raylet 限制了正在执行的任务的参数的总大小,因为在任务完成之前无法释放参数。默认上限为对象存储内存的 70%。这确保了只要没有其他对象因 “ray.get” 请求而被活动锁定,任务就可以创建一个对象存储容量的 30%。

目前,raylet 没有为 worker 的“ray.get”请求的对象实现类似的上限,因为这样做可能会导致任务之间的死锁。因此,如果对大型对象有过多的并发 “ray.get” 请求,raylet 仍可能耗尽共享内存。发生这种情况时,raylet 会将对象分配为本地磁盘上的内存映射文件(默认情况下为/tmp)。由于I/O开销,这样分配的对象性能较差,但即使对象存储已满,它也允许应用程序继续运行。如果本地磁盘已满,则分配将失败,之后应用程序将收到 OutOfDiskError。

对象溢出(Object spilling)

Ray 默认支持在对象存储的容量用完后将对象溢出到外部存储。

外部存储通过可插拔接口实现。默认情况下支持两种类型的外部存储:

本地存储。默认情况下选择本地磁盘,这样 Ray 用户就可以使用对象溢出功能,而无需任何额外配置。

分布式存储(实验性,目前提供 Amazon S3)。访问速度可能较慢,但这可以提供更好的容错性,因为数据可以在工作节点故障后存活。

对象溢出由几个部分构成:

raylet 内

  • 本地对象管理器:跟踪对象元数据,例如外部存储中的位置,并协调 IO woker 和与其它 raylet 的通信。
  • 共享内存对象存储

IO workers

用于溢出和恢复对象的 python 进程。

外部存储

用于存放无法放入共享内存对象存储的对象。

raylet 管理一个 I/O worker 池。I/O  worker 从本地共享内存对象存储和外部存储进行读/写。

当 Ray 没有足够的内存容量来创建对象时,它会引发对象溢出。请注意,Ray 只溢出对象的主副本:这是通过执行任务或通过 “Ray.put” 创建的对象的初始副本。非主副本可以立即被逐出,这种设计确保了集群中每个对象最多有一个溢出的副本。只有在对象溢出后,或者应用程序中没有更多引用时,主副本才可收回。

协议如下所示,重复执行,直到留出足够的空间来创建任何需要的对象:

  • Raylet(本地对象管理器)查找本地对象存储中的所有主副本。
  • Raylet将这些对象的溢出请求发送给 IO worker。
  • IO worker 将对象值及其元数据写入外部存储。
  • 一旦主副本溢出到外部存储,raylet 将使用溢出对象的位置更新对象目录。
  • 对象存储区收回主副本。
  • 一旦对象的引用计数变为0,所有者就会通知 raylet 可以删除该对象。raylet 向 IO worker 发送请求,以从外部存储中删除对象。

溢出的对象将根据需要恢复。当请求对象时,Raylet 要么通过向本地 IO worker 发送恢复请求从外部存储恢复对象,要么从不同节点上的 Raylet 获取副本。远程 Raylet 可能会将对象溢出到本地存储(例如,本地SSD)上。在这种情况下,远程 raylet 直接从本地存储读取对象并将其发送到网络。

由于 IO 开销,每个文件一个对象溢出许多小对象是低效的。对于本地存储,操作系统将很快耗尽 inode。如果对象小于 100MB,Ray 会将对象融合到单个文件中以避免此问题。

Ray 还支持多目录溢出,这意味着它使用安装在不同位置的多个文件系统。当多个本地磁盘连接到同一台机器时,这有助于提高溢出带宽和最大外部存储容量。

目前存在的限制:

  • 使用本地文件存储时,如果存储溢出对象的节点丢失,则溢出对象将丢失。在这种情况下,Ray 将尝试恢复对象,就像它从共享内存中丢失一样。
  • 如果所有者丢失,则无法访问溢出的对象,因为所有者存储对象的位置。
  • 应用程序当前正在使用的对象被 “pinned”。例如,如果 Python 的 Driver 有一个指向ray.get 获得的对象的原始指针(例如,共享内存上的 numpy 数组),则该对象将被固定。在应用程序释放这些对象之前,它们是不可使用溢出机制的。正在运行的任务的参数也固定在任务的持续运行的时间内,运行结束后才可以。

引用计数

每个 worker 存储其所拥有的每个对象的引用计数。所有者的本地引用计数包括本地Python 引用计数和作为所有者提交的任务所依赖的对象。当 Python 的 “ObjectRef” 被释放时,前者将递减。当依赖于对象的任务成功完成时(注意,以应用程序级异常结束的任务视为成功),后者将递减。

ObjectRef 也可以通过将它们复制到另一个进程。接收 “ObjectRef” 副本的过程称为借用者。例如:

@ray.remote
def temp_borrow(obj_refs):

  # Can use obj_refs temporarily as if I am the owner.

  x = ray.get(obj_refs[0])

@ray.remote
class Borrower:

  def borrow(self, obj_refs):

    # We save the ObjectRef in local state, so we are still borrowing the object once this task finishes.

   self.x = obj_refs[0]

x_ref = foo.remote()

temp_borrow.remote([x_ref])  # Passing x_ref in a list will allow `borrow` to run before the value is ready.

b = Borrower.remote()

b.borrow.remote([x_ref])  # x_ref can also be borrowed permanently by an actor.

通过跟踪这些引用。简言之,每当引用“逃离”本地作用域时,所有者就会添加到本地引用计数中。例如,在上面的代码中,当调用 “temp_borrow.remote” 和“b.borrow.remoto” 时,所有者会增加 x_ref 的挂起任务计数。一旦任务完成,它会向所有者回复一个仍在借用的引用列表。例如,在上述代码中,“temp_borrow” 的 worker 会回答说,它不再借用 “x_ref”,而 “Borrower” 的 worker 会回答说它仍在借用 “x_ref”。

如果 worker 仍在借用任何对象,所有者会将 worker 的 ID 添加到本地的 borrowers 列表中。borrowers 保持第二个本地参考计数,与所有者类似,一旦 borrowers 的本地参考计数变为 0,所有者要求 borrowers 回复。此时,所有者可以将 worker 从 borrowers 列表中删除并收集对象。在上述示例中,“borrowers” 的 worker 正在永久借用引用,因此所有者在 “borrowers” 自身超出范围或死亡之前不会释放对象。

borrowers 也可以递归地添加到所有者列表中。如果 borrowers 本身将 “ObjectRef” 传递给另一个进程,就会发生这种情况。在这种情况下,当 borrowers 响应所有者其本地引用计数为 0 时,它还包括其创建的任何新 borrowers 。所有者反过来使用相同的协议联系这些新的 borrowers。

根据上述一共包含下面不同的引用计数:

本地 python 引用计数

等于 worker 进程中 python 的引用技术,在取消或者分配 python 中的 ObjectRef 递增或递减。

提交任务计数

依赖于尚未完成执行的对象的任务数。当 worker 提交任务时递增。当任务完成时递减。如果对象足够小,可以存储在进程内存储中,则在将对象复制到 Task specification 中时,此计数会提前递减。

借用者(Borrowers)

当前借用 “ObjectRef” 的进程的一组工作 ID。借用者是一个 worker 但不是所有者并且拥有 Python 本地实例的 ObjectRef,每个借用者还会维护一个本地的借用者列表,允许借用者将 “ObjectRef” 发送给另一借用者,而无需联系所有者 。当任务被传递一个 ObjectRef 并在任务结束后继续使用它时,该任务通知其调用方它正在借用该对象。然后,被调用的 worker 将任务 的 worker 的 ID 添加到此集合中。

当 ObjectRef 的引用计数为 0 时,如果是所有者本身会自动删除,所有者向每个### 借用者发送异步 RPC。借用者在收到后将其删除,如果无法联系到借用者,会从列表中删除。

如果借用者被移除,worker 会等待来自所有者的 rpc,一旦 worker 本地的引用计数为 0,worker 就会将借用者弹出并告知所有者。

嵌套计数(Nested count)

在作用域中且其值包含有 ObjectRefObjectRef 数。

谱系计数(Lineage count)

启用对象重建时使用。依赖于此 “ObjectRef” 且其值存储在分布式对象存储中(可能在失败时丢失)的任务数。在提交依赖于对象的任务时递增。如果任务返回的“ObjectRef”超出范围,或者任务完成并在进程内存储中返回值,则递减。

Corner cases

x_ref = foo.remote()
@ray.remote
def capture():
  ray.get(x_ref)  # x_ref is captured. It will be pinned as long as the driver lives.

创建引用的常规做法是将 ObjectRef 作为任务参数直接传递给其它 worker,或者在数据结构(如列表)内部传递。也可以通过使用 “ray.cloudpickle” 对 “ObjectRef” 进行额外引用。在上面代码下,ray 无法跟踪对象的序列化副本或确定 ObjectRef 何时已反序列化(例如,如果 ObjectRef 由非 Ray 进程反序列化)。因此,将向对象的计数添加一个永久引用,以防止对象超出范围。

带 out-of-band 序列化的其它方法包括使用 “pickle” 或自定义序列化方法。与上述类似,Ray 无法跟踪这些引用。访问反序列化的 ObjectRef(即通过调用“ray.get”或作为任务参数传递)可能会导致引用计数异常。

Actor handles

用于跟踪(非分离)Actor 的生命周期。虚拟对象用于代表 Actor。此对象的 ID 是根据 Actor 创建任务的 ID 计算的。Actor 的创建者拥有虚拟对象。

当 Python 的 Actor handle 被释放时,这会减少虚拟对象的本地引用计数。当在 Actor handle 上提交任务时,这会增加虚拟对象的已提交任务计数。当一个 Actor handle 被传递给另一个进程时,接收进程被算作虚拟对象的借用者。一旦引用计数达到 0,所有者就通知 GCS 服务可以安全地销毁 Actor。

Actor 是不会被 Ray 自动回收的,需要显式删除。

与 Python GC

当对象是 Python 中引用循环的一部分时,Python 不能保证这些对象会被及时地垃圾回收。所有ObjectRef 可以在分布式对象存储中恶意地保持 Ray 对象的活动状态,当对象存储接近容量时,Ray 会周期性地在所有 Python worker 中触发“gc.collect()”。这确保 Python 引用循环不会导致虚假的对象存储已满状态。

Object Failure

在发生系统故障时,Ray 将尝试恢复任何丢失的对象,如果无法恢复,并且 worker 试图获取对象的值,则会引发应用程序级异常。

在更高层次上,Ray 保证如果所有者仍然活着,将尝试恢复对象。如果恢复失败,所有者将在异常中填写原因。如果对象的所有者已经死亡,任何试图获取该值的 worker 都会收到一个关于所有者死亡的异常,即使对象副本仍然存在于集群中。

小对象

小对象存储在所有者的进程中对象存储中,因此如果所有者死亡,小对象将丢失。任何试图在未来获取对象值的 worker 都将收到所有者已死亡的异常,并将错误存储在本地进程内对象存储中。

大对象和谱系重建

如果不存在其它副本,Ray 将尝试通过恢复对象。这是指通过重新执行创建对象的任务来恢复丢失的对象。如果任务的依赖关系也丢失,或者以前由于垃圾收集而被逐出,那么这些对象将被递归地重建。

谱系重建通过在每个对象旁边保持额外的“谱系引用计数”来工作。这是指依赖于对象本身可能被重新执行的任务数。如果任务或下游任务返回的任何对象仍在范围内,则可以重新执行任务。一旦谱系引用计数达到 0,Ray 将垃圾回收创建对象的 task specification。请注意,这是一个独立于对象值的垃圾收集机制:如果对象的直接引用计数达到 0,则即使其谱系计数保持在范围内,其值也将从 Ray 的对象存储中进行垃圾收集。

请注意,谱系重建可能会导致比通常更高的 Driver 内存使用率。如果总大小超过系统范围阈值(默认值为1GB),每个 Ray 工作程序将尝试回收其本地缓存的谱系计数。

谱系重建目前有以下限制。如果应用程序不满足这些要求,那么它将收到一个重建失败的异常:

  • 对象及其任何传递依赖项必须由任务(actor or non-actor)生成。这意味着 ray.put 创建的对象不可恢复。请注意,由 “ray.put” 创建的对象始终与其所有者存储在同一节点上,所有者将最终与该节点共享;因此,在 “ray.put” 对象的主副本丢失的情况下,应用程序将收到一个通用的 “OwnerDiedError”。
  • 任务被假定为确定性和幂等性的。因此,默认情况下,由 Actor 任务创建的对象是不可重构的。如果用户将参与者的 “max_task_retrys” 和 “max_restarts” 设置为非零值,则可以作为谱系的一部分重新执行参与者任务。
  • 任务将仅重新执行其最大重试次数。默认情况下非 Actor 最多重试 3 次,Actor 不能重试。你可以通过 “max_retrys” 和 “max_task_retry” 参数修改。
  • 对象的所有者必须仍然活着。

如果存储在分布式内存中的对象的所有者丢失:在对象解析期间,raylet 将尝试定位对象的副本。同时,raylet 将定期与所有者联系,以检查所有者是否还活着。如果所有者已死亡,raylet 将存储一个系统级错误,该错误将在对象解析期间抛出给引用持有者。

任务管理

任务执行

任务调用方在从分布式调度程序请求资源之前等待创建所有任务参数可用。在许多情况下,任务的调用者也是任务参数的所有者。任务的调用方可能借用了任务参数,即它从所有者处收到了参数 “ObjectRef” 的反序列化副本。在这种情况下,任务调用方必须通过与参数所有者执行协议来确定参数是否已创建。借用进程将在反序列化 “ObjectRef” 时与所有者联系。一旦创建了对象,所有者就会做出响应,借用者会将对象标记为就绪。如果所有者失败,借用者也会将该对象标记为已准备好,因为对象的命运与所有者共享。

任务可以有三种类型的参数:plain values, inlined objects, 和 non-inlined objects.

plain values 不需要依赖关系解析。

inlined objects 是足够小的对象,可以存储在进程内存储中(默认阈值为100KB)。调用者可以将这些直接复制到 task specification 中。

non-inlined objects 是存储在分布式对象存储中的对象。其中包括大对象和已被所有者以外的进程借用的对象。在这种情况下,调用者将要求 raylet 在调度决策期间说明这些依赖关系。raylet 将等待这些对象成为其节点的本地对象,然后再给予依赖这个任务的 worker 。这确保了正在执行的工作程序在接收任务时不会阻塞(等待对象变为本地对象)。

资源调度实现

任务调用程序通过首先向请求的首选 raylet 发送资源请求来调度任务。可选择以下任一项:

  • 按数据位置:如果任务的对象参数存储在共享内存中,则调用方选择本地对象参数最多的节点。该信息通过调用方的本地对象目录检索,可能是过时的(例如,如果同时发生对象传输或逐出)。
  • 按节点关联:如果目标 raylet 使用了 NodeAffinitySchedulengStrategy 指定。
  • 默认情况下使用本地的 raylet。

首选 raylet 对请求进行排队,如果它要给予资源,则使用当前租给调用者的本地 worker 的地址来响应调用者。只要主动请求方和租用的 worker 还活着,租约就保持活动状态,并且 raylet 确保在租约处于活动状态时,其他客户端都不能使用该 worker。为了确保公平性,如果没有剩余的任务或已经过了足够的时间(例如,几百毫秒),调用方将返回空闲的工作程序。

请求方可以将任意数量的任务调度到租用的 worker 上,只要这些任务与授权的资源请求兼容即可。因此,租用可以被认为是一种以避免与调度器进行类似调度请求的通信优化方案。调度请求可以重用租用的工作人员,如果它具有以下相同的条件:

  • 资源规模,如 CPU:1。
  • 共享内存任务参数,因为这些参数必须在任务执行之前在节点上设置为本地。注意,小任务参数不需要匹配,因为这些参数被内联到任务参数中。此外,在数据结构内部传递的 ObjectRef 不需要匹配,因为 Ray 不会在任务开始之前将这些 ObjectRef 设置为本地。
  • 运行时环境,因为租用的工作程序在此环境中启动。

调用方可以持有多个 worker 租约以提高并行性。worker 租约在多个任务之间缓存,可以以减少调度程序的负载。

如果首选 raylet 选择不在本地给予资源,它还可以使用远程 raylet 的地址来响应调用者,调用者应该在该地址重试资源请求。这就是所谓的溢出调度。远程 raylet 可以根据其本地资源的当前可用性同意或拒绝资源请求。如果资源请求被拒绝,则调用者将再次从首选 raylet 请求,并且重复相同的过程,直到某个 raylet 授予资源请求。

资源管理和调度

Ray 中的资源是一个键-值对,其中键表示资源名称,值是一个浮点数。为了方便起见,Ray 调度器具有对C PU、GPU 和内存资源类型默认支持。 Ray 的资源是逻辑上的资源,不需要与物理资源进行 1 对 1 映射,默认情况下,Ray将每个节点上的逻辑资源数量设置为 Ray 自动检测到的物理数量。

用户还可以使用自定义资源需求,例如,指定资源需求{“custom_resource”:0.01}。可以在启动时向节点添加自定义资源。

分布式调度程序会尝试匹配集群中合适的资源,如一个任务要求 {“CPU”:1.0,“GPU”:1.0,那么此任务只能在 CPU >=1 和 GPU >=1 的节点上调度。默认每个@ray.remote 函数一定会需要 1 个 CPU 来运行。对于 actor 来说,默认是 0 个 CPU。这样一来,单个节点可以承载比其核心更多的 actor,从而将 CPU 尽可能留给操作系统。

有一些资源需要特殊处理:

  • CPU、GPU 和“内存”的数量在 Ray 启动期间自动检测。
  • 将 GPU 资源分配给任务将自动设置到 worker 的 CUDA_VISIBLE_DEVICES 系统变量,通过 ID 的方式限制使用的 GPU。

注意,因为资源请求是逻辑上的,所以 Ray 不会强制执行物理资源限制。用户可以指定准确的资源需求,例如,为具有 n 个线程的任务指定 “num_cpus=n” 。

分布式调度

资源统计

每个 raylet 跟踪所在节点的资源。当授予资源请求时,raylet 会相应地减少可用的本地资源。一旦资源被释放(或请求者死亡),raylet 就会相应地增加本地资源的可用性。因此,raylet 始终具有本地资源可用性的高度一致的视图。

每个 raylet 还从 GCS 接收关于集群中其他节点上资源可用性的信息。这用于分布式调度,例如,跨集群中的节点进行负载平衡。为了减少收集和传播的开销,这些信息是最终一致性的,可能会存在过时的问题。信息通过定期广播发送。GCS定期(默认情况下为 100ms)从每个 Ray 节点获取可用的资源,然后将其聚合并广播回每个 Ray 节点。

调度状态机

当 raylet 接收到资源请求(即 RequestWorkerLease PRC)时,它将通过上述状态机并以下面三种状态之一结束:

  • Granted:客户端现在可以使用 被授权的资源和 worker 来执行任务或 Actor。
  • Reschedule:根据当前节点对集群的观察,有一个比当前节点更好的节点。客户应重新安排请求。有两种可能性触发这个情况:
  • 如果当前节点是客户端的首选 raylet(即客户端联系的第一个 raylet),则这是一个 spillback 请求。客户端应在第一个 raylet 指定的 raylet 处重试请求。
  • 否则,当前节点是客户端首选 raylet 选择的节点。客户端应该再次在首选的 raylet 上重试请求。
  • Canceled:无法调度运行所需的资源。
  • 被选中的已经失效。
  • 无法为任务创建合适的运行环境,无法启动工作京城。

调度策略

Ray 有几个调度策略来控制在哪里运行任务或参与者。当提交任务或参与者时,用户可以选择指定要使用的调度策略/策略。

默认混合策略

当没有指定其它策略时,这是默认策略。此策略首先尝试将任务打包到本地节点,直到节点的关键资源利用率超过配置的阈值(默认情况下为 50%)。关键资源利用率是该节点上任何资源的最大利用率,例如,如果节点使用 8/10 个 CPU 和 70/100GB RAM,则其关键资源利用是 80%。

在本地节点上超过阈值后,策略将任务打包到第一个远程节点(按节点 id 排序),然后打包到第二个远程节点,依此类推,直到所有节点上的关键资源利用率超过阈值。之后,它将选择关键资源利用率最低的节点。

该策略的目的是在 bin-packing 和负载均衡之间实现平衡。当节点处于临界资源利用率时,策略倾向于 bin-packing 。按节点 ID 排序可确保所有节点在 bin-packing 时使用相同的顺序。当节点超过临界资源利用率时,策略支持负载平衡,选择负载最少的节点。

差价策略

此策略是循环在具有可用资源的节点之间分配任务。这个循环是本地的,并不是全局的。

节点相关性策略

使用此策略,用户可以明确指定任务或 Actor 应运行的目标节点。如果目标节点处于活动状态,则任务或 Actor 仅在那里运行。如果目标节点已死亡,则看是否能被调度到其它节点,不行的话就不会调度。

数据位置策略

Ray 通过让每个任务调用方基于调用方关于任务参数位置的本地信息选择首选 raylet。raylets 实现的单独的调度策略不考虑数据位置,这是为了避免向 raylet 添加用于发现哪些任务参数存储在哪些其他节点上的额外的 RPC 和复杂性。

预占用组策略

此策略将任务或 Actor 运行在指定的预占用组。

预占用组

Ray 支持预占用组这个功能,从多个节点中自动保留一组资源。它可以用于普通任务或者 Actor 的调度。通常用于 gang - scheduling actors

由于资源组可能涉及跨多个节点的资源,Ray 使用跨 raylets 来确保原子性。该协议由GCS协调。如果有任何 raylet 在协议执行过程中死亡,预占用组的创建将回滚,GCS 将再次对请求进行排队。如果 GCS 请求失效,且 GCS 容错功能启用,则重启后会 ping 所有参与者以重新启动协议。

与 Ray 中的其它东西不同,预占用组没有引用计数,由创建它们的 worker 和独立的 Actor 所有。在所有者死亡时自动销毁。你也可以显式销毁,销毁后使用这些保留的资源的任务和 Actor 都会被杀死,资源被释放。

当一个预占用组被创建时,它会请求保留了多个节点的资源。当其中一个节点故障时,确缺失的资源会重新被安排,优于其它还没分配完成的预占用组。在这些缺失的资源包被重新创建之前,该预占用组仍然处于部分分配状态。

Actor 管理

Actor 创建

当在 Python 中创建 Actor 时,创建工作人员首先向 GCS 注册 Actor 。对于独立 Actor ,注册是以同步的方式进行的,以避免同名 Actor 注册。对于非独立的 Actor(默认), 使用异步注册。

注册后,一旦解决了 Actor 创建任务的所有输入依赖关系,创建者就将 task specification 发送给 GCS 服务。然后,GCS 服务通过与正常任务相同的分布式调度协议来调度 Actor 创建任务,就好像 GCS 是 Actor 创建任务的调用者一样。

Actor handle 的原始创建者可以在 Actor handle 上提交任务,甚至在 GCS 安排 Actor 创建任务之前将其作为参数传递给其它任务/ Actor。

异步注册时,在 Actor 向 GCS 注册之前,创建者不会将 Actor handle 传递给其它任务/ Actor。这是为了防止创建者在注册完成之前死亡;通过阻止任务提交,我们可以确保引用 Actor 的其它 worker 可以发现注册失败。在这种情况下,任务提交仍然是异步的,因为创建者只是缓冲远程任务,直到 Actor 注册完成。

一旦创建 Actor 完成,GCS 将通过 pub-sub 通知任何拥有这个 Actor 的 handle 的 worker。每个 handle 缓存新创建的 Actor 的运行时元数据(例如 RPC 地址)。

然后,在 Actor handle 上提交的任何未处理的任务都可以发送给 Actor 执行。

与任务定义类似,Actor 定义通过 GCS 下载到 worker 上。

Actor 执行

Actor 可以有无限数量的调用者。 一个 Actor handle 表示单个调用者,它包含其引用的Actor 的 RPC 地址。需要调用的 worker 将连接到这个地址并且提交任务。

一旦创建, Actor 任务就转换为对 Actor 进程的直接 gRPC 调用。一个 Actor 可以处理多个并发调用,尽管这里只显示了一个。

Actor 死亡

Actor 可以使独立或者非独立的,默认是非独立的。当所有 handle 超出范围或执行结束时,Ray 会自动垃圾回收它们。独立的 Actor 的生命周期与他们的原始创建者无关,一旦不再需要他们,应用程序必须手动删除他们。

对于非独立的 Actor,当 Actor 的所有等待的任务都已完成,所有的 Actor handle 都已超出范围(通过引用计数进行跟踪)时,Actor 的原始创建者通知 GCS 服务。GCS 服务然后向参与者发送 KillActor RPC 杀死 Actor。


如果 GCS 检测到创建者已退出(通过心跳),GCS 也会终止 Actor。在这个 Actor 上提交的所有未处理的任务和后续任务都将失败,并出现 RayActorError。

Actor 也可能在运行时意外崩溃,默认情况下,提交给崩溃了的 Actor 的所有任务都将失败,并出现 RayActorError,就像 Actor 正常退出一样。

Ray 还提供了一个(max_restarts)来自动重新启动 Actor,可以指定最多重启次数。如果启用了此选项,并且 Actor 的所有者仍然活着,GCS 服务将尝试通过重新提交其创建任务来重新启动崩溃的 Actor。所有具有 Actor handle 的客户端都会将任何未处理的任务缓存到 Actor,直到 Actor 重新启动。如果 Actor 无法重新启动或已达到最大重启次数,则客户端将使所有等待任务失败。

第二个选项(max_task_retrys)可用于在 Actor 重新启动后自动重试失败的 Actor 任务。这对于幂等任务和用户不需要自定义处理 RayActorError 的情况非常有用。

全局控制服务(Global Control Service,GCS)

全局控制服务,也称为GCS,是 Ray 的控制平台。它管理 Ray 集群,并充当协调 raylets 和发现其他节点进程的集中平台。GCS 还作为外部服务(如自动缩放器和仪表盘)与 Ray 集群通信的入口。GCS 目前是单线程的,心跳检查和资源轮询除外;

相关功能有:

  • 节点管理,管理集群节点的添加和删除,并广播给所有 raylet。
  • 资源管理,广播所有 raylet 并确认每个的资源可用性。
  • Actor 管理,处理 Actor 的创建和销毁请求、监测他们是否活跃,在出现问题时尝试重新创建。
  • 预占用组管理,处理 Ray 的预占用组的创建和删除。
  • 元数据存储,提供所有 worker 都能访问的键值存储,仅适用于小的元数据,任务和对象的元数据存储在拥有者的 worker 中。
  • worker 管理,处理 raylet 的故障报告。
  • 运行环境管理,用于管理运行环境包,包括包的使用次数和垃圾回收次数。

GCS 还提供了几个 gRPC 端点用来帮助获取当前 Ray 集群的状态,如 Actor
worker 、节点信息等。

GCS 默认使用简单的哈希 map 内存存储,可以将它放到 Redis 中。

节点管理

当 raylet 启动时会向 GCS 注册,GCS 会将 raylet 的信息存入存储中,注册成功后广播其它所有的 raylet 节点。

节点注册后,GCS 通过定期进行健康检查来监控 raylet 的活跃度。GCS 还获取 raylet 的资源情况,并将其广播给其它 raylet 节点。如果 raylet 检查失败,GCS 还会向集群广播 raylet 的死亡。一旦 raylet 接收到信息,它们就会清除相关的记录。

Raylets 还向 GCS 报告任何 worker 进程的死亡,以便将其广播给其它 Raylets。方便提前终止向该 worker 提交任务之类的场景。

资源管理

GCS 负责确保 raylet 拥有集群中资源使用情况的最新记录。如果记录不是最新的,raylet 可能会错误地将任务调度到另一个没有资源的节点运行任务。

默认情况下,GCS 将每 100ms 从注册的 raylet 中提取资源使用量。它还每100毫秒向所有 raylet 广播全局资源情况。

GCS 也是自动缩放器获取当前集群负载的入口点。自动缩放器使用它来分配或删除集群中的节点。

Actor 管理

GCS 在 Actor 管理中发挥着重要作用。所有 Actor 都需要先在 GCS 登记,然后才能调度。GCS 也是独立 Actor 的所有者。

预占用组管理

GCS 还管理预占用组的生命周期。GCS 通过两阶段提交协议来创建预占用组。

元数据存储

  • 集群仪表盘地址。
  • 远程功能的定义,在开发项目中定义远程运行的函数时,Ray 会检查有没有注册过,没有的话会添加。被指派的 worker 会从 GCS 中获取定义。
  • 运行环境的数据,默认情况下运行环境目录存储在 GCS 中,GCS 通过计算独立 Actor 和 job 使用情况来进行垃圾回收。
  • 一些 Ray 的其它组件的数据也会存储在这,如 Ray Serve 会将部署的元数据存储在这。

容错性

GCS 是 Ray 中非常关键的组件,故障的话整个集群都会故障。

在 2.0 中 GCS 在故障中会尝试恢复,恢复之前可能集群工作会不正常。

默认情况下,GCS 将所有数据存储到内存存储中,一旦发生故障,该存储将丢失。为了使 GCS 能够容错,它必须将数据写入持久存储。Ray 支持 Redis 作为外部存储系统。为了支持GCS容错,GCS 应该有一个高可用 Redis 实例作为支持。然后,当 GCS 重新启动时,它首先从 Redis 存储中加载信息,包括发生故障时集群中运行的所有 Raylet、actor 和预占用组。然后,GCS 将恢复健康检查和资源管理等常规功能。

GCS 故障时下列功能都无法使用:

  • Actor 的管理。
  • 预占用组的管理。
  • 资源管理。
  • raylet 管理。
  • worker 的管理。

由于这些组件不需要读取或写入GCS,因此任何正在运行的 Ray 的任务和 Actor 都将保持活动状态。同样,任何现有对象都将继续可用。

集群管理

集群管理中涉及的流程。蓝色进程是位于 Head 节点上的单进程。粉色进程在节点启动,并管理其本地节点的辅助进程。

Autoscaler 用于自动缩放,根据节点资源情况和利用率添加或删除节点。

Autoscaler 负责从集群中添加和删除节点。它会监测分布式调度器暴露的逻辑资源需求、集群中当前的节点、集群的节点配置文件,计算所需的集群配置,并尝试扩缩容(比如调用云提供商添加或删除机器)。

Autoscaler 的工作流程如下:

- 应用程序提交任务、Actor、预占用组,这些任务请求诸如 cpu 之类的资源。

  • 调度器查看需求和可用性,如果无法满足,则将其置于挂起状态。该信息被快照到GCS 中。
  • 自动缩放器作为单独的进程运行,将定期从 GCS 获取快照。它查看集群中可用的资源、请求的资源、挂起的资源、为集群指定的工作节点配置,并运行 bin-packing 算法来计算节点数量,以满足正在运行和挂起的任务、Actor 和预占用组请求。
  • 自动缩放器然后通过节点供应商接口从集群中添加或删除节点。节点供应商接口允许Ray 插入不同的云提供商(例如AWS、GCP、Azure)、集群管理器(例如Kubernetes)或本地数据中心。
  • 当新的节点启动时,它向集群注册并接受应用程序的工作负载。

如果节点空闲超时(默认情况下为5分钟),则会将其从集群中删除。当没有活动任务、Actor 或对象的主要副本时,节点被视为空闲。

可扩容节点的数量有限制,这取决于扩容速度。速度定义为扩容的节点数与当前节点数的比率。值越高,扩容得越多。例如,如果将其设置为 1.0,则集群的大小在任何时候都最多可以增长 100%,因此如果集群当前有 20 个节点,则最多允许 20 个新增节点。可扩容的最小数量为 5,以确保即使对于小集群也有足够的扩容速度。

Ray 也支持云厂商的不同实例类型和设置镜像、IAM 角色等。还可以指定这个机器是否有特殊资源。

Ray Client server 是 Ray Client 用的代理服务,用于支持集群的交互开发。

API server + Dashbord server,提供仪表盘服务和集群状态管理 API 的入口。

API agent,在本地节点上收集集群管理需要的指标,安装用于任务和 Actor 执行的运行时环境。

Log monitor, 负责监听本地的日志,默认在 /tmp/ray/session_last/logs,如果有错误日志会给到 Driver。

Job 提交

Job 可以通过命令行、Python SDK 或者 REST API 提交到 Ray 集群,CLI 调用 PythonSDK,后者反过来向 Ray 集群上的 Jobs RESTAPI 服务器发出 HTTP 请求。REST API 当前托管在 Ray 仪表板后端,但将来可能会移动到独立的API服务器。

每个 Job 都由自己的专用 Job 主管 Actor 管理,该角色在 Ray Head 节点上运行。这个 Actor 在 Job 的用户指定的运行时环境中运行,Job 的用户指定的端点命令在继承了这个运行环境的子进程中运行。如果这个命令包含 Ray 的脚本,则 Ray 的脚本将附加到正在运行的 Ray 集群。

Job 会报告结构化的状态(如 PENDING, RUNNING, SUCCEEDED)和通过 API 获取到的消息。这些数据都会存储到 GCS 中。

Job 管理的 Actor 及其启动的子流程命运共享。如果 Job 的 Actor 死亡,则保留最新的工作状态。下次用户请求作业状态时,状态将更新为“FAILED”。

通过在相应的 Job 管理的 Actor 上设置“停止”事件,可以异步停止 Job。 这个 Actor 会负责终止 Job 子流程,更新其状态并退出。

Job 的管理和 Job 的 管理 的 Actor 和日志会在入口点脚本的输出中直接写入 Head 节点上的文件里,该文件可以通过 HTTP 端点读取或流式传输。

Ray 2.0 允许在除 Head 节点之外调度管理 job 的 Actor,以便减轻多租户的 Head 节点的压力。

运行时和多租户

它在运行时动态安装在集群上,可以在 Ray Job 中或特定的 Actor 和任务中指定 Python 脚本运行所需的依赖关系,如文件、包和环境变量。

运行环境的安装和删除由运行在每个节点上的 RuntimeEnvAgent gRPC 服务器处理。RuntimeEnvAgent 命运与 raylet 共享,raylet 是调度任务和参与者的核心组件。

当任务或 Actor 需要运行时环境时,raylet 会向 RuntimeEnvAgent 发送一个 gRPC 请求来创建环境(如果环境还不存在)。

创建环境可能需要做:

  • 通过 pip install 下载和安装软件包。
  • 为 Ray 的 worker 进程设置环境变量。
  • 在启动 Ray 的 worker 进程前使用 conda 切换环境。
  • 从远程云存储下载文件。

运行环境资源(如下载的文件和安装的 conda 环境)会缓存在每个节点上,以便它们可以在不同的任务、Actor 和 Job 之间共享。

当超过缓存大小限制时,将删除当前没有 Actor、任务或 Job 在使用的资源。

KubeRay

Kuberray operator 可以在 Kubernetes 设置和管理 Ray 集群。每个 Ray 的节点都作为Kubernetes pod运行在 k8s 上。

可观测性

Dashbord

Ray 提供了像 Dashbord 之类的可视化工具和观测工具来方便检查状态、排查问题。

日志聚合

Ray driver 会聚合并输出从 actor 和任务中打印的所有日志消息。当任务或 actor 将日志打印到其 stdout 或 stderr 时,它们会自动重定向到相应的工作日志文件。日志监视器的进程会在每个节点上运行,定时读取并通过 GCS pubsub 将日志发送到 driver。

指标观测

Ray 与 OpenCensus 集成了,默认支持将可以观测的指标导出到 Prometheus。

状态 API

自 Ray 2.0 以来,Ray 支持允许用户通过 CLI 或 Python SDK 方便地访问当前 Ray 的状态的快照。状态 API 支持查询特定 Ray 任务、 Actor 等的查询。

流程详细示例

分布式任务调度

我们将从 worker 1 执行 A 开始。任务 B 和 C 提交给 worker。worker 1 的本地所有权表已经包含 X 和 Y 的条目。首先,我们将介绍一个调度 B 执行的示例:

  • Worker 1 向其本地调度器请求执行 B 的资源。
  • 调度器 1 作出响应,告诉 worker 1 在节点 2 重试调度请求。
  • Worker 1 更新其本地所有权表,要求任务 B 在节点 2 上挂起。
  • Worker 1 向节点 2 上的调度器请求执行 B 的资源。
  • 调度器 2 将资源授予 worker 1,并用 worker 2 的地址进行响应。调度器 2 确保没有其他任务被分配给 worker 2,而 worker 1 仍然拥有资源。
  • Worker 1 发送任务 B 给 worker 2 执行。

任务的执行

接下来,我们将展示一个 worker 执行任务并将返回值存储在分布式对象存储中的示例:

  1. Worker 2 完成执行 B 并将返回值 X 存储在其本地对象存储中。
    a. 节点 2 异步更新对象表, X 现在位于节点 2 上(虚线箭头)。
    b. 因为是创建的 X 的第一个副本,所以节点 2 还固定 X 的副本,直到 worker 1 通知节点 2 可以释放对象。这将确保对象值在引用期间是可访问的。
  2. Worker 2 回复 worker 1 ,任务 B 已经完成。
  3. Worker 1 更新其本地所有权表,将 X 存储在分布式内存中。
  4. Worker 1将资源返回给调度器 2。worker 2 现在可以被重用来执行其他任务。

分布式任务调度和参数解析

既然任务 B 已经完成,任务 C 就可以开始执行了。Worker 1 使用与任务 B 类似的协议安排下一个任务 C:

  1. 对象表响应调度器 3,表明 X 位于节点 2 上。
  2. 调度器要求节点 2 上的对象库发送一份X的副本
  3. X 被从节点 2 复制到节点 3。
    a. 节点 3 也异步更新对象表,表明 X 也在节点 3 上(虚线箭头)。
    b. 节点 3 的 X 的副本被缓存了,但没有被固定。当本地 worker 正在使用它时,该对象不会被驱逐。然而,与节点 2 上的 X 的副本不同,当对象存储区 3 处于内存压力之下时,节点 3 的副本可能会根据 LRU 被驱逐。如果这种情况发生,而节点 3 又需要该对象,它可以使用这里显示的相同协议从节点 2 或不同的副本重新获取它。
  4. 由于节点 3 现在有一个 X 的本地副本,调度器 3 将资源授予 worker 1,并 worker 3的地址作为回应。

任务执行和对象内联

任务 C 执行并返回一个小到足以存储在进程内存存储中的对象:

  1. Worker 1 发送任务 C 到 worker 3 执行。
  2. Worker 3 从其本地对象存储中获取 X 的值(类似于ray.get),并运行 C(X)。
  3. Worker 3 完成 C 并返回 Y,这次是直接通过值而不是存储在其本地对象存储中。
  4. Worker 1 将 Y 存储在其进程中的内存存储中。它也删除了任务 C 的 specification 和位置,因为 C 已经执行完毕。此时,任务 A 中未完成的ray.get 调用将从 worker 1的进程内存储中找到并返回 Y 的值。
  5. Worker 1 将资源返回给调度器 3。Worker 3 现在可以被重新使用来执行其它任务。这可以在步骤 4 之前完成。

垃圾回收

最后,我们将展示 worker 是如何清理内存的:

  1. Worker 1 删除了对象 X 的记录。这样做是安全的,因为等待中的任务 C 有对 X 的唯一引用,而且 C 现在已经完成了。Worker 1 保留其对 Y 的记录,因为应用程序仍然有对 Y 的 ObjectID 的引用。
    a. 最终,X 的所有副本都从集群中删除。这可以在步骤 1 之后的任何时间进行。如上所述,如果节点 3 的对象存储处于内存压力之下,节点 3 的 X 的副本也可能在步骤 1 之前被删除。