如何实现一个请求级线程安全的缓存

需求

需要实现一个请求级线程安全的缓存,例如在一个请求中,会对反复使用到某个数据库的值,或者文件中的值,这时我们可以使用 ContextVar 来创建一个全局变量对改值进行缓存。但也会出现一些问题。

问题

使用 gunicorn 部署的 Flask/Django/FastAPI 应用,工作模式为 syncgevent(尤其是启用了 threads > 1),使用 ContextVar 来存储请求级别的上下文,就可能出现线程复用导致的数据污染

gunicorn 等 WSGI 服务器中使用多线程时,线程会被复用(回收) 来处理多个请求。而 ContextVar 是绑定在线程上的,如果一个线程被回收后继续使用旧的上下文变量,就可能导致前一个请求的数据“泄露”到下一个请求中

例如:用户 A 的身份信息出现在用户 B 的请求中、上一个请求的日志上下文污染了当前请求

如何解决

  1. 需要一个全局的 _thread_recycles: ContextVar[int] 用来标记 当前线程被“重新投入使用”的次数(即:开始处理一个新请求的次数)
  2. 需要自定义一个RecyclableContextVar (需要实现的类) 的成员变量 self._updates: ContextVar[int] 用来记录自己“更新”过多少次
  3. 当调用 RecyclableContextVar 的 get 方法时,对比上述两者的值,若_thread_recycles> _updates(线程被重复复用的次数大于当前变量的次数,有可能发生数据污染),说明是旧上下文,这时可以拒绝使用。
  4. 当调用 RecyclableContextVar 的 set 方法时,若_thread_recycles> _updates,需要将 _updates 计数更新为 _thread_recycles,确保值的正确插入。若 _thread_recycles = _updates 则更新 _updates

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
from contextvars import ContextVar  # [CALUDE-GEN] Python 3.7+引入的上下文变量,用于在异步环境中传递数据
from typing import Generic, TypeVar # [CALUDE-GEN] 泛型类型支持,用于创建类型安全的容器

# [CALUDE-GEN] 定义泛型类型变量T,用于RecyclableContextVar的类型参数
T = TypeVar("T")


class HiddenValue:
"""
[CALUDE-GEN] 隐藏值标记类
用作内部标记,表示某个值不应该被外部直接使用或访问
在默认参数中使用,以区分None值和未提供值的情况
"""
pass


# [CALUDE-GEN] 创建默认的隐藏值实例,用于函数参数的默认值
_default = HiddenValue()


class RecyclableContextVar(Generic[T]):
"""
[CALUDE-GEN] 可回收上下文变量包装类

这是一个ContextVar的安全包装器,解决了在gunicorn线程池回收机制下的竞态条件问题。
在gunicorn多线程环境中,线程会被循环使用,导致ContextVar中的旧数据可能被遗留。
该类通过跟踪线程回收次数,确保数据的一致性和安全性。

注意:在处理请求之前需要调用 `increment_thread_recycles` 方法
目前不支持 `reset` 功能
"""

# [CALUDE-GEN] 类级别的上下文变量,记录当前线程的回收次数
_thread_recycles: ContextVar[int] = ContextVar("thread_recycles")

@classmethod
def increment_thread_recycles(cls):
"""
[CALUDE-GEN] 增加线程回收计数器

在每个新请求开始时调用此方法,用于标记线程已被回收并重新使用。
这个机制能够检测到线程池中的线程是否被回收,从而避免使用过期的上下文数据。
"""
try:
# [CALUDE-GEN] 尝试获取当前线程的回收次数
recycles = cls._thread_recycles.get()
# [CALUDE-GEN] 将回收次数加一
cls._thread_recycles.set(recycles + 1)
except LookupError:
# [CALUDE-GEN] 如果是第一次访问,初始化为0
cls._thread_recycles.set(0)

def __init__(self, context_var: ContextVar[T]):
"""
[CALUDE-GEN] 初始化可回收上下文变量

Args:
context_var: 要包装的原始 ContextVar 实例
"""
# [CALUDE-GEN] 保存原始ContextVar的引用
self._context_var = context_var
# [CALUDE-GEN] 创建一个跟踪更新次数的ContextVar,用于检测数据新鲜度
self._updates = ContextVar[int](context_var.name + "_updates", default=0)

def get(self, default: T | HiddenValue = _default) -> T:
"""
[CALUDE-GEN] 安全地获取上下文变量的值

该方法会检查线程是否被回收,如果是,将返回默认值或抛出异常。
这样可以避免使用已过期的上下文数据。

Args:
default: 当上下文变量不存在时的默认值

Returns:
T: 上下文变量的值或默认值

Raises:
LookupError: 当上下文变量不存在且未提供默认值时
"""
# [CALUDE-GEN] 获取当前线程的回收次数
thread_recycles = self._thread_recycles.get(0)
# [CALUDE-GEN] 获取当前实例的更新次数
self_updates = self._updates.get()
# [CALUDE-GEN] 如果线程回收次数大于更新次数,说明线程被回收过,需要更新
if thread_recycles > self_updates:
self._updates.set(thread_recycles)

# [CALUDE-GEN] 检查线程是否被回收以及是否应该更新
if thread_recycles < self_updates:
# [CALUDE-GEN] 当前上下文是有效的,返回存储的值
return self._context_var.get()
else:
# [CALUDE-GEN] thread_recycles >= self_updates,表示当前上下文已无效
if isinstance(default, HiddenValue) or default is _default:
# [CALUDE-GEN] 没有提供有效的默认值,抛出异常
raise LookupError
else:
# [CALUDE-GEN] 返回提供的默认值
return default

def set(self, value: T):
"""
[CALUDE-GEN] 安全地设置上下文变量的值

该方法会确保在设置值之前正确更新计数器,以维护数据一致性。

Args:
value: 要设置的值
"""
# [CALUDE-GEN] 如果之前从未调用过`set`方法,可能会导致self._updates小于cls._thread_recycles的情况
# 手动增加以保持一致性
thread_recycles = self._thread_recycles.get(0)
self_updates = self._updates.get()
# [CALUDE-GEN] 如果线程回收次数大于更新次数,同步更新次数
if thread_recycles > self_updates:
self._updates.set(thread_recycles)

# [CALUDE-GEN] 如果更新次数等于线程回收次数,说明是当前线程的最新操作
if self._updates.get() == self._thread_recycles.get(0):
# [CALUDE-GEN] 增加更新次数以标记这次新的设置操作
self._updates.set(self._updates.get() + 1)

# [CALUDE-GEN] 设置上下文变量的值
self._context_var.set(value)

使用时,创建 RecyclableContextVar 实例

1
2
3
4
5
6
7
# [CALUDE-GEN] 插件模型模式的线程锁,用于同步访问模型模式定义
plugin_model_schema_lock: RecyclableContextVar[Lock] = RecyclableContextVar(ContextVar("plugin_model_schema_lock"))

# [CALUDE-GEN] 插件模型模式的上下文变量,存储模型名称到AI模型实体的映射
plugin_model_schemas: RecyclableContextVar[dict[str, "AIModelEntity"]] = RecyclableContextVar(
ContextVar("plugin_model_schemas")
)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
......

try:
# 尝试获取当前请求的schema缓存字典,如果还没初始化(即 LookupError),就创建一个空字典 {} 和一把锁 Lock() 并存入上下文
contexts.plugin_model_schemas.get()
except LookupError:
contexts.plugin_model_schemas.set({})
contexts.plugin_model_schema_lock.set(Lock())

# 获取当前上下文中的锁对象,并进入 with 块(即加锁),在同一请求内,防止多个线程同时修改这个请求上下文中的缓存字典(虽然通常一个请求一个线程,但在多线程处理同一请求时仍有必要)。[这不是跨请求的锁,而是当前请求内部的同步机制,确保缓存读写不冲突。]
with contexts.plugin_model_schema_lock.get():

# 带着锁的二次检查,确保 cache_key 的 schema 没有被缓存起来
# 即第二次检查是为了确保“我拿到锁的时候,别人没抢先干完活”,避免重复工作,保证缓存的幂等性和效率。
if cache_key in contexts.plugin_model_schemas.get():
return contexts.plugin_model_schemas.get()[cache_key]

# 上述操作确认了 cache_key 的 schema 没有被缓存起来
schema = plugin_model_manager.get_model_schema(
tenant_id=self.tenant_id,
user_id="unknown",
plugin_id=self.plugin_id,
provider=self.provider_name,
model_type=self.model_type.value,
model=model,
credentials=credentials or {},
)

# 如果成功获取到 schema,就把它存入当前请求的缓存字典中,键为 cache_key。下次同一个请求再次访问相同 cache_key 时,就会命中缓存,直接返回。
if schema:
contexts.plugin_model_schemas.get()[cache_key] = schema

return schema

......

整体逻辑图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
开始

├─→ 尝试获取 contexts.plugin_model_schemas
│ ↳ 成功:继续
│ ↳ 失败(LookupError):初始化为 {} 和 Lock()

├─→ 获取锁(当前请求内的锁)

├─→ 检查 cache_key 是否在缓存中?
│ ↳ 是:返回缓存值 ✅
│ ↳ 否:
│ └─→ 调用 get_model_schema() 获取真实数据
│ └─→ 如果有结果,写入缓存
│ └─→ 返回结果

问题

上述缓存方式的特点是什么?

上述缓存方式为使用 ContextVar 实现的 请求级内存缓存(Request-Scoped In-Memory Cache),数据存储在当前请求的上下文中,生命周期 = 当前请求的生命周期,不跨请求共享,典型用途:避免同一个请求内重复计算或 I/O

_thread_recycles 和 _updates 都是 ContextVar 类型的,为什么他们不会被线程复用影响?确保 RecyclableContextVar 是可复用的?

_thread_recycles 和 _updates 只要线程复用,那么他们就会有残留值,它们之所以能工作,是因为: 有一个全局版本号_thread_recycles)由外部主动递增,每次 set 都会确保 _updates 至少等于当前版本,通常还会 +1,get 时通过比较版本号,判断数据是否属于“当前生命周期”。所以,它不是靠“不残留”来安全,而是靠“残留了也能发现并修复”来安全。

即:它们不会“免疫”线程复用的影响,而是通过“主动更新机制”来识别并修复污染。它们不是“不会被影响”,而是“被影响了也能自我纠正”。

和其他类型的缓存的对比

缓存类型 示例 作用域 声明周期 是否持久 并发安全 典型用途
ContextVar 请求级缓存 contexts.plugin_model_schemas 单个请求内 请求开始 → 结束 ❌ 否 ✅ 是 避免同一请求内重复加载
线程局部缓存 threading.local() 单个线程内 线程存活期间 ❌ 否 ⚠️ 同线程安全,但线程复用会污染 旧式 Web 框架用
进程内存缓存 lru_cache, 全局 dict 所有请求共享 进程运行期间 ❌ 否 ❌ 否(需手动加锁) 提升整体性能,减少重复计算
分布式缓存 Redis, Memcached 所有服务实例共享 手动设置(TTL) ✅ 可持久 ✅ 是(服务端保证) 跨服务共享数据、会话存储

总结

  1. 上述例子中,实现了 请求级线程安全缓存机制,用于缓存插件模型的 schema,避免重复加载,提升性能。(同一个请求中多次访问同一 model schema,只查一次)
  2. 避免了线程池复用带来的 ContextVar 污染的问题(每个请求有自己的ContextVar,互不干扰,不会跨请求污染)

参考

  1. dify/api/contexts/wrapper.py at main · langgenius/dify