Python 业务框架循环依赖和全局上下文的一些思考

Photo by Tyler Casey / Unsplash
Why do we need controllers?
I’m trying to explain why we do not need controllers because the controller layer is not that important anymore. We split most responsibility of...

背景

去年我有提过我对于 Python 框架的一些设想,这个框架目前已经在内部投入生产环境使用了。在实际开发中很多同学会容易引发循环依赖的问题,这引发我一个思考为什么 Python 的循环依赖问题这么严重呢?如何简单的优雅的解决呢?

Python 提供了方便的模块导入和管理方式,这使得开发结构化的 Python 项目相对简单。但是,这也意味着项目的结构容易变得混乱。例如,我们可能会遇到以下问题:

  • 多重且混乱的循环依赖:如果我们的代码中存在循环依赖,例如一个模块需要导入另一个模块,而另一个模块也需要导入第一个模块,这会导致代码难以维护。
  • 隐含耦合:如果我们对一个类或模块进行更改,可能会影响到其他类或模块的代码,这可能导致代码变得脆弱和难以理解。
  • 大量使用全局变量或上下文:如果我们使用全局变量或上下文来传递信息,而不是明确地传递变量,这可能导致代码出现错误,因为我们无法确定变量的值在哪里被修改。
  • 面条式代码:如果我们的代码中包含大量的if语句和for循环,以及复制-粘贴的代码,这会使代码难以维护和调试。
  • 混沌代码:如果我们的代码缺乏结构和命名规范,可能会导致代码变得混乱和难以理解。

循环依赖是其中最明显也是最容易碰到的问题:A 文件引用 B 文件中的内容,B 文件又引用了 A 文件中的内容。

Python 在引入模块时会按照深度优先的顺序进行 import:

  1. 从sys.path指定路径中查找待引入的模块。
  2. 从模块中加载代码并保证代码能够编译。
  3. 创建与该模块相对应的空对象。
  4. 将空模块对象添加到sys.modules中。
  5. 运行模块对象中的代码,定义其内容。

简单来说就是先去查找模块,然后加载模块代码再创建空对象,运行这个模块对象的代码,反复这个过程。假设 A 模块引入B  模块。B 模块又引入 A 模块。那么 A 模块未定义任何内容,即 A 未完成整个引入过程,此时 A 是空模块,但是 B 又引入 A 模块中的内容,但是 A 此时是空模块,就会出现循环依赖的问题。

常见的解决方法分为两种,一种是不在文件头部引用而是在使用时引入这个做法比较隐式而且不太符合代码规范;一种是给每个模块提供一个初始化用的函数,在引入完成后调用一次这个初始化的函数来实际加载模块。

还有一个比较常见的问题是 Python 不支持单例,这就会导致在多线程下可能会出现创建多个所需对象的问题。比如加载深度学习模型,这种对象是非常非常大的,我们希望即使在多线程下仍然共用一个对象。比较好的办法是将这种对象作为 class 的类变量,不声明在任何方法里。

我们举个我正在开发的这个框架的例子好了,这个框架有个 Telemetry 的插件,我们分别有  TelemetryPlugin 和 TelemetryHandler 两个文件,TelemetryHandler 是用于注册到框架用于拦截请求处理某些当请求到框架时间要 .... 、请求结束时要... 这样的场景的。

TelemetryPlugin 需要初始化 opentelemetry 提供的 Tracer 对象。且这个 Tracer 对象是只有在业务框架启动时才会跟随启动的。

class TelemetryPlugin(Plugin):

    tracer: trace.Tracer

    def __init__(self, app: DollarBird) -> None:
        super().__init__()
        self.app = app

而在 TelemetryHandler 中我们也需要使用到这个 Tracer 对象,我们希望在 Handler 拦截请求的前后能通过这个 Tracer 发送跟踪信息,这样我们就实现自动记录一个请求多少耗时。如果我们这时候需要用到这个 Tracer 的话需要下面这样用:

TelemetryPlugin.tracer

也就是说 TelemetryHandler 依赖 TelemetryPlugin,如果我们还需要在 TelemetryPlugin 中实现当业务框架启动时自动注册 Handler 就无法实现了。因为  TelemetryHandler 依赖 TelemetryPlugin,而 TelemetryPlugin 不能重新引入 TelemetryHandler,要不就会出现循环依赖的问题。

这时候我就开始思考我是否能通过业务框架的全局上下文进行传递或者是放到某个地方托管这个对象。我先调研了下其它类似的 API 开发框架的上下文管理方案。

主流 API 框架上下文管理

Django

当你需要向所有模板传递一个可以被全局使用的变量时。在编写 Django 视图函数时,我们一般会在视图函数中以 Python 字典 (dict) 形式向模板中传递需要被调用或使用的变量并指定渲染模板。通常情况下,我们向模板的传递的字典变量与模板是一一对应的关系。有时我们还需要向模板传递全局变量,即每个模板都需要使用到的变量(比如站点名称, 博客的最新文章列表)。

如果每个视图函数分别去查询数据库,然后向每个模板传递这些变量,不仅造成代码冗余,而且会造成对数据库的重复查询。一个更好的解决方案就是使用自定义的上下文处理器 (Context Processors) 给模板传递全局变量,一次查询全局使用,完美解决了这些问题。

TEMPLATES = [
    {
        'BACKEND': 'django.template.backends.django.DjangoTemplates',
        'DIRS': [
            os.path.join(BASE_DIR, 'templates')
        ],
        'APP_DIRS': True,
        'OPTIONS': {
            'context_processors': [ # 以下包含了4个默认的全局上下文处理器
                'django.template.context_processors.debug',
                'django.template.context_processors.request',
                'django.contrib.auth.context_processors.auth',
                'django.contrib.messages.context_processors.messages',
                'myapp.custom_context_processors.xxx',  # 自定义全局上下文处理器
            ],
        },
    },
]

Django 一般包含了上述4个默认全局上下文处理器,如果我们需要自定义全局上下文处理器本质上是个函数,使用它必须满足3个条件:

  1. 传入参数必须有 request 对象
  2. 返回值必须是个字典
  3. 使用前需要在 settings 的 context_processors 里申明。

接下来我们来看一个具体例子。我们需要向所有模板传递一个叫site_name的全局变量以便在所有模板中直接使用 site_name 输出站点名称,我们可以在blog(应用名)的目录下新建 context_processors.py,新增如下代码:

# blog/context_processors.py

from django.conf import settings
def global_site_name(request):
    return {'site_name': settings.SITE_NAME,}

我们可以发现其实 Django 本质上在通过上下文取出指定变量的时候,就是在调用一个无状态的函数。纯函数的好处是给定一个固定的输入,输出也总是固定的,也更适合做单元测试和修改。坏处就是这样不太直观,而且自定义程度比较低。比如我们不好控制这个返回的东西在什么时候初始化,也许是跟随项目启动初始化、又也许是使用时初始化。也不太适合面向对象的场景。

Flask

Flask中有两种上下文,程序上下文(application context)和请求上下文(request context)。

视图函数需要上下文信息,Flask 将请求报文封装在 request 对象中,但是在视图函数中,并没有把它传进视图函数,而是直接从 Flask 导入一个全局的 request 对象,然后在视图函数里直接调用 request 的属性获取数据。为什么在处理请求时,视图函数里的 request 会自动包含对应请求的数据呢?因为 Flask 在每个请求产生后自动激活当前请求的上下文,激活请求上下文后,request 被临时设为全局可访问。当每个请求结束后,flask就销毁对应的请求上下文。

在多线程服务中,在同一时间可能会有多个请求在处理。假设有三个客户端同时向服务器发送请求,这时每个请求都有各自不同的请求报文,所以请求对象必然是不同的。

因此,请求对象只在各自的线程内是全局的。Flask 通过本地线程(thread local)技术将请求对象在特定的线程和请求中全局可访问。

为了方便获取这两种上下文,Flask 提供了一共四种上下文变量:

  • current_app,指向处理请求的当前程序实例。
  • g,替代 Python 的全局变量,确保仅在当前请求可用,用于存储全局数据。每次请求时都会重设。
  • request,客户端你的报文数据。
  • session,用于记住请求之间的数据,通过签名的 Cookie 来实现的。

在不同的视图函数中,request 对象都表示和视图函数对应的请求,就是当前请求(current request)。程序会有多个程序实例的情况,为了能获取对应的程序实例,而不是固定的某一个程序实例,我们就需要使用 current_app 变量。

g 存储在程序上下文中,而程序上下文会随着每一个请求的进入而激活,随着每一个请求的处理完毕而销毁,所以每次请求都会重设这个 g 值。

我们通常会使用它结合钩子来保存每个请求处理前所需要的全局变量。

在 Flask 中如果我们需要将一个对象保存成应用级的全局对象,不会随着请求过来和结束而重置的话,我们就必须通过控制 current_app 来进行传递。比如 flask-sqlalchemy 中是这么初始化的:

teardown_appcontext 会在每次应用环境后调用。

app.extensions["sqlalchemy"] = self
app.teardown_appcontext(self._teardown_session)

这种方式比较简单直观,是通过字典的方式进行保存。但也有风险,其它地方可能会强行修改掉这个字典中 sqlalchemy 的值。如果我们希望这个东西能给到业务开发用而不单单是插件的话,有可能出现误操作,不小心换个错的,而且很难排查。

FastAPI

FastAPI 主要是使用上下文管理器处理需要跟随应用启动和关闭的逻辑。比如用 yield 创建一个异步函数 lifespan() 如下:

from contextlib import asynccontextmanager

from fastapi import FastAPI


def fake_answer_to_everything_ml_model(x: float):
    return x * 42


ml_models = {}


@asynccontextmanager
async def lifespan(app: FastAPI):
    # Load the ML model
    ml_models["answer_to_everything"] = fake_answer_to_everything_ml_model
    yield
    # Clean up the ML models and release the resources
    ml_models.clear()


app = FastAPI(lifespan=lifespan)


@app.get("/predict")
async def predict(x: float):
    result = ml_models["answer_to_everything"](x)
    return {"result": result}

最简单实现全局变量的方法是在启动时设置个变量,然后其它地方引用这个变量。

我们的方案

我们自己的框架是一个纯协程框架,所以可以不考虑多线程问题,所以最好的方案是通过实现类似 Flask 的方案来实现,即在 application 中传递一个字典。我们在前面有提到这个方案的问题:如果我们希望这个东西能给到业务开发用而不单单是插件的话,有可能出现误操作,不小心换个错的,而且很难排查。

这时候我想起来我之前给 Ray 社区翻译的白皮书中有个关于对象所有权的概念。

Ray -分布式计算框架架构设计详解 v2
这次带来的是分布式计算框架 Ray v2 版本的架构 设计中文详解,可能是目前关于 Ray 中文资料最详细的博文了。Ray 是一个为了给分布式提供通用的 API 发明出来的分布式计算框。
大多数的系统是通过一种叫做 Ownership 的分散控制的方式管理的,这个方式是指每一个 ObjectRef 都是由所在的 worker 进程管理的,该 worker 或者也被叫做 owner 需要确保 Task 的执行、创建 value。换句话来说 owner 是生成和初始化  ObjectRef 的 worker,如果 ObjectRef 由 Task 返回,那么这个值是由远程 worker 创建的而不是拿到返回值的 worker。

那既然我们能够在多主机多进程上实现所有权概念,那为什么我们不能在一个协程上实现一个简易的所有权字典呢?(已知这种对象是几乎不变更的,即使在添加到字典或者更新时慢一点点也无所谓。)朴素的想法就是谁创建的对象谁负责,只能由原本的创建方修改或者删除。

在协程下,我们首先知道掉所有权的主体不可以是协程或者是线程、进程。协程粒度太小而且、线程一直就一个、进程一直就一个。如果是线程或者进程那基本上所有变量都是同个人。

基于 Python 文件的特性,我们可以以 python 文件为单位进行简单的设计。增加或者修改这个字典的 python 文件是这个新增的对象的所有权的主体,只有它能更改、删除这个对象,但是其它文件可以正常获取这个对象。

那么如何回溯调用者?我想到了一个在捕获异常时常用的一个函数 traceback.print_exception。我们可以通过这个 traceback 包中的另一个函数 traceback.extract_stack 实现回溯调用方的能力。

我们基于上面的想法实现了一个简单的所有权字典:

from collections import UserDict
from typing import Any, Dict
import traceback


class OwnershipDict(UserDict[str, Any]):
    def __init__(  # type:ignore
        self, dict: Dict[str, Any] | None = None, /, **kwargs
    ) -> None:
        self.data = {}
        if dict is not None:
            self.update(dict)
        if kwargs:
            self.update(kwargs)
        self.__ownership_dict: Dict[str, str] = {}

    def __missing__(self, key: str) -> Any:
        if isinstance(key, str):
            raise KeyError
        else:
            return self[str(key)]

    def __setitem__(self, key: str, value: Any) -> Any:
        callback_from_file = self.__check_ownership(key)
        self.data[str(key)] = value
        self.__ownership_dict[key] = callback_from_file

    def __delitem__(self, key: str) -> Any:
        self.__check_ownership(key)
        del self.data[key]
        del self.__ownership_dict[key]

    def __check_ownership(self, key: str) -> Any:
        all_stack = traceback.extract_stack()
        callback_stack = all_stack[-3]
        callback_from_file = callback_stack[0]
        print(f"The Callback from {callback_from_file}")
        if (
            self.__ownership_dict.get(key) is not None
            and not self.__ownership_dict.get(key) == callback_from_file
        ):
            raise KeyError
        return callback_from_file

这个字典有一个所有权字典和实际数据存储的字典,当设置一个值或者删除到字典时会先去判断它是否在所有权中有记录,如果有记录的话并且不是原本设置这个变量的所有权方就会抛出异常。我们平常使用时按照字典一样使用即可:

dict = OwnershipDict()
dict["key"] = "value"
print(dict["key"])

当然目前这个只是几分钟写的简单的版本,还可以继续优化和完善。

我们现在有了所有权字典后只需要学 Flask 一样放到 application 作为它的属性即可实现在不同的请求中都能共用同一个对象并且无法随意替换这个对象,在前面举例 的 TelemetryPlugin 和 TelemetryHandler 的问题上,我们只需要将 TelemetryPlugin 中的 Trancer 对象放到这个字典中,TelemetryHandler 同样去这个字典中取即可,他们之间就不再需要互相 import 对方。

一部分代码示例来自互联网搜索结果,如有侵权请联系我。

小 Tips:很多循环依赖问题其实可以通过 https://peps.python.org/pep-0484/#forward-references 这个方案解决。

class Tree:
    def __init__(self, left: Tree, right: Tree):
        self.left = left
        self.right = right

转变为下面的写法。

class Tree:
    def __init__(self, left: 'Tree', right: 'Tree'):
        self.left = left
        self.right = right

引用的类如果只用于做类型校验可以加个下面的判断,这样仅有在需要类型校验时才会引入。

if TYPE_CHECKING:
    import xxxx