需求
需要实现一个请求级线程安全的缓存,例如在一个请求中,会对反复使用到某个数据库的值,或者文件中的值,这时我们可以使用 ContextVar 来创建一个全局变量对改值进行缓存。但也会出现一些问题。
问题
使用 gunicorn 部署的 Flask/Django/FastAPI 应用,工作模式为 sync
或 gevent
(尤其是启用了 threads > 1
),使用 ContextVar
来存储请求级别的上下文,就可能出现线程复用导致的数据污染。
在 gunicorn 等 WSGI 服务器中使用多线程时,线程会被复用(回收) 来处理多个请求。而 ContextVar
是绑定在线程上的,如果一个线程被回收后继续使用旧的上下文变量,就可能导致前一个请求的数据“泄露”到下一个请求中。
例如:用户 A 的身份信息出现在用户 B 的请求中、上一个请求的日志上下文污染了当前请求
如何解决
- 需要一个全局的
_thread_recycles: ContextVar[int]
用来标记 当前线程被“重新投入使用”的次数(即:开始处理一个新请求的次数)
- 需要自定义一个
RecyclableContextVar
(需要实现的类) 的成员变量 self._updates: ContextVar[int]
用来记录自己“更新”过多少次
- 当调用
RecyclableContextVar
的 get 方法时,对比上述两者的值,若_thread_recycles> _updates
(线程被重复复用的次数大于当前变量的次数,有可能发生数据污染),说明是旧上下文,这时可以拒绝使用。
- 当调用
RecyclableContextVar
的 set 方法时,若_thread_recycles> _updates
,需要将 _updates 计数更新为 _thread_recycles,确保值的正确插入。若 _thread_recycles = _updates
则更新 _updates
代码实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
| from contextvars import ContextVar from typing import Generic, TypeVar
T = TypeVar("T")
class HiddenValue: """ [CALUDE-GEN] 隐藏值标记类 用作内部标记,表示某个值不应该被外部直接使用或访问 在默认参数中使用,以区分None值和未提供值的情况 """ pass
_default = HiddenValue()
class RecyclableContextVar(Generic[T]): """ [CALUDE-GEN] 可回收上下文变量包装类 这是一个ContextVar的安全包装器,解决了在gunicorn线程池回收机制下的竞态条件问题。 在gunicorn多线程环境中,线程会被循环使用,导致ContextVar中的旧数据可能被遗留。 该类通过跟踪线程回收次数,确保数据的一致性和安全性。 注意:在处理请求之前需要调用 `increment_thread_recycles` 方法 目前不支持 `reset` 功能 """
_thread_recycles: ContextVar[int] = ContextVar("thread_recycles")
@classmethod def increment_thread_recycles(cls): """ [CALUDE-GEN] 增加线程回收计数器 在每个新请求开始时调用此方法,用于标记线程已被回收并重新使用。 这个机制能够检测到线程池中的线程是否被回收,从而避免使用过期的上下文数据。 """ try: recycles = cls._thread_recycles.get() cls._thread_recycles.set(recycles + 1) except LookupError: cls._thread_recycles.set(0)
def __init__(self, context_var: ContextVar[T]): """ [CALUDE-GEN] 初始化可回收上下文变量 Args: context_var: 要包装的原始 ContextVar 实例 """ self._context_var = context_var self._updates = ContextVar[int](context_var.name + "_updates", default=0)
def get(self, default: T | HiddenValue = _default) -> T: """ [CALUDE-GEN] 安全地获取上下文变量的值 该方法会检查线程是否被回收,如果是,将返回默认值或抛出异常。 这样可以避免使用已过期的上下文数据。 Args: default: 当上下文变量不存在时的默认值 Returns: T: 上下文变量的值或默认值 Raises: LookupError: 当上下文变量不存在且未提供默认值时 """ thread_recycles = self._thread_recycles.get(0) self_updates = self._updates.get() if thread_recycles > self_updates: self._updates.set(thread_recycles)
if thread_recycles < self_updates: return self._context_var.get() else: if isinstance(default, HiddenValue) or default is _default: raise LookupError else: return default
def set(self, value: T): """ [CALUDE-GEN] 安全地设置上下文变量的值 该方法会确保在设置值之前正确更新计数器,以维护数据一致性。 Args: value: 要设置的值 """ thread_recycles = self._thread_recycles.get(0) self_updates = self._updates.get() if thread_recycles > self_updates: self._updates.set(thread_recycles)
if self._updates.get() == self._thread_recycles.get(0): self._updates.set(self._updates.get() + 1)
self._context_var.set(value)
|
使用时,创建 RecyclableContextVar 实例
1 2 3 4 5 6 7
| plugin_model_schema_lock: RecyclableContextVar[Lock] = RecyclableContextVar(ContextVar("plugin_model_schema_lock"))
plugin_model_schemas: RecyclableContextVar[dict[str, "AIModelEntity"]] = RecyclableContextVar( ContextVar("plugin_model_schemas") )
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| ......
try: contexts.plugin_model_schemas.get() except LookupError: contexts.plugin_model_schemas.set({}) contexts.plugin_model_schema_lock.set(Lock())
with contexts.plugin_model_schema_lock.get(): if cache_key in contexts.plugin_model_schemas.get(): return contexts.plugin_model_schemas.get()[cache_key] schema = plugin_model_manager.get_model_schema( tenant_id=self.tenant_id, user_id="unknown", plugin_id=self.plugin_id, provider=self.provider_name, model_type=self.model_type.value, model=model, credentials=credentials or {}, ) if schema: contexts.plugin_model_schemas.get()[cache_key] = schema
return schema ......
|
整体逻辑图
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| 开始 │ ├─→ 尝试获取 contexts.plugin_model_schemas │ ↳ 成功:继续 │ ↳ 失败(LookupError):初始化为 {} 和 Lock() │ ├─→ 获取锁(当前请求内的锁) │ ├─→ 检查 cache_key 是否在缓存中? │ ↳ 是:返回缓存值 ✅ │ ↳ 否: │ └─→ 调用 get_model_schema() 获取真实数据 │ └─→ 如果有结果,写入缓存 │ └─→ 返回结果
|
问题
上述缓存方式的特点是什么?
上述缓存方式为使用 ContextVar
实现的 请求级内存缓存(Request-Scoped In-Memory Cache),数据存储在当前请求的上下文中,生命周期 = 当前请求的生命周期,不跨请求共享,典型用途:避免同一个请求内重复计算或 I/O
_thread_recycles 和 _updates 都是 ContextVar 类型的,为什么他们不会被线程复用影响?确保 RecyclableContextVar 是可复用的?
_thread_recycles 和 _updates 只要线程复用,那么他们就会有残留值,它们之所以能工作,是因为: 有一个全局版本号(_thread_recycles
)由外部主动递增,每次 set
都会确保 _updates
至少等于当前版本,通常还会 +1,get
时通过比较版本号,判断数据是否属于“当前生命周期”。所以,它不是靠“不残留”来安全,而是靠“残留了也能发现并修复”来安全。
即:它们不会“免疫”线程复用的影响,而是通过“主动更新机制”来识别并修复污染。它们不是“不会被影响”,而是“被影响了也能自我纠正”。
和其他类型的缓存的对比
缓存类型 |
示例 |
作用域 |
声明周期 |
是否持久 |
并发安全 |
典型用途 |
ContextVar 请求级缓存 |
contexts.plugin_model_schemas |
单个请求内 |
请求开始 → 结束 |
❌ 否 |
✅ 是 |
避免同一请求内重复加载 |
线程局部缓存 |
threading.local() |
单个线程内 |
线程存活期间 |
❌ 否 |
⚠️ 同线程安全,但线程复用会污染 |
旧式 Web 框架用 |
进程内存缓存 |
lru_cache , 全局 dict |
所有请求共享 |
进程运行期间 |
❌ 否 |
❌ 否(需手动加锁) |
提升整体性能,减少重复计算 |
分布式缓存 |
Redis, Memcached |
所有服务实例共享 |
手动设置(TTL) |
✅ 可持久 |
✅ 是(服务端保证) |
跨服务共享数据、会话存储 |
总结
- 上述例子中,实现了 请求级线程安全缓存机制,用于缓存插件模型的 schema,避免重复加载,提升性能。(同一个请求中多次访问同一 model schema,只查一次)
- 避免了线程池复用带来的 ContextVar 污染的问题(每个请求有自己的
ContextVar
,互不干扰,不会跨请求污染)
参考
- dify/api/contexts/wrapper.py at main · langgenius/dify