你的 CrewAI 流水线里 3 个 Agent 共用一个 Redis key workflow:state 作为共享黑板(blackboard),每个 Agent 读取当前状态、添加自己的结论后写回。并发运行时,Agent A 和 Agent B 都在同一时刻读到了版本 5 的状态,Agent A 写入了自己的结论变成版本 6,然后 Agent B 也写入了自己的结论(基于版本 5 的旧内容),直接覆盖了 Agent A 的改动,版本仍然是 6 但 A 的结论消失了。或者在 AutoGen 的 GroupChat 里,多个 Coder Agent 共用一个内存 dict 保存「已修复的 bug 列表」,并发写入时字典的内部状态损坏(Python dict 在 CPython 里不是线程安全的)。
常见原因
1. 读-改-写操作没有原子性保护
Agent 的操作模式是「读取当前状态 → 在本地修改 → 写回」,但这三步之间没有任何互斥机制。两个 Agent 读到相同的旧状态,各自修改,分别写回,后写入者覆盖前写入者的改动。这是经典的「丢失更新」(Lost Update)问题。
怎么判断:查看共享状态的读写代码,是否有 GETSET(原子读写)、事务、或锁保护。如果是裸的 get + set,就存在丢失更新风险。
2. 使用了非线程安全的数据结构
Python 的 dict 在 CPython 里受 GIL 保护,单次 d[k] = v 操作是线程安全的,但「读取-判断-写入」的复合操作不是。如果用了 collections.defaultdict、list(.append() 是安全的,但 l += [x] 不是)或自定义类,线程安全性需要额外验证。
怎么判断:确认共享数据结构的类型,查阅其线程安全性文档。对于 asyncio 场景,await 点之间是安全的,但两个 await 之间的复合操作仍然不安全。
3. 消息历史被多个 Agent 并发 append
LangGraph 的 MessagesState 里的 messages 列表被多个并行节点同时 append,两个节点各自读到长度为 10 的列表,各自写入一条消息后把长度为 11 的列表写回,其中一条消息被覆盖,最终列表长度仍然是 11 而不是 12。
怎么判断:在并行节点里统计写入 messages 的次数,与实际 messages 列表的增长量对比。如果增长量少于写入次数,就发生了覆盖。
4. Redis 的 WATCH/MULTI/EXEC 事务被绕过
代码使用了 Redis 事务(WATCH + MULTI + EXEC)来防止竞态,但某些更新路径(如错误处理分支)直接用了裸的 SET,绕过了事务保护。
怎么判断:搜索所有对共享 Redis key 的写入点,检查是否所有写入都在事务里,还是有部分路径直接 SET。
5. 乐观锁的版本号检查没有在原子操作里
用「比较并交换」(CAS)模式防止竞态:读取版本号,修改数据,检查版本号没变才写入。但「检查版本号」和「写入」两步之间有时间窗口,在这个窗口里另一个 Agent 可以改变版本号,导致 CAS 失效。
怎么判断:检查 CAS 逻辑是否用了原子操作(Redis 的 WATCH + EXEC,或数据库的 UPDATE ... WHERE version=N AND ...),而不是两次独立的读写操作。
6. 内存分区不够细,多个 Agent 竞争同一个粒度的锁
所有 Agent 共用一个「全局写锁」,每次更新需要获取这个锁。粒度太粗导致并发度低(Agent 大量时间在等锁),但如果细化了锁的粒度,又容易出现锁的遗漏或竞态条件。
怎么判断:统计 Agent 等待锁的平均时间,如果超过业务容忍的延迟,锁的粒度可能需要细化。
最短修复路径
Step 1:用 Redis WATCH + MULTI + EXEC 实现乐观锁
import redis
import json
def atomic_update_state(r: redis.Redis, key: str, updater_fn, max_retries: int = 5):
"""乐观锁更新:如果版本在读写之间被修改,自动重试。"""
for attempt in range(max_retries):
with r.pipeline() as pipe:
try:
# WATCH 监视 key,如果在 EXEC 之前 key 被其他客户端修改,EXEC 会失败
pipe.watch(key)
# 读取当前状态
raw = pipe.get(key)
state = json.loads(raw) if raw else {}
# 业务逻辑修改(在 pipeline 外执行,不受 WATCH 保护)
new_state = updater_fn(state)
# 开启事务,原子写入
pipe.multi()
pipe.set(key, json.dumps(new_state))
pipe.execute() # 如果 WATCH 的 key 被改动,这里会抛 WatchError
return new_state
except redis.WatchError:
# 另一个 Agent 在我们读取后修改了 key,重试
if attempt == max_retries - 1:
raise RuntimeError(f"乐观锁冲突:重试 {max_retries} 次后仍然失败")
continue
# 使用:Agent A 添加自己的结论
def add_agent_conclusion(state: dict) -> dict:
state.setdefault("conclusions", [])
state["conclusions"].append({"agent": "agent_a", "result": "..."})
return state
atomic_update_state(redis_client, "workflow:state", add_agent_conclusion)
Step 2:用分区(Namespace)隔离不同 Agent 的写入空间
# 每个 Agent 只写自己的命名空间,不写公共命名空间
# workflow:state -> 全局只读配置
# workflow:agent_a:output -> 只有 Agent A 写
# workflow:agent_b:output -> 只有 Agent B 写
# workflow:aggregated -> 由 Aggregator Agent 汇总
AGENT_NAMESPACES = {
"agent_a": "workflow:agent_a:output",
"agent_b": "workflow:agent_b:output",
"agent_c": "workflow:agent_c:output",
}
def write_agent_output(agent_id: str, output: dict, r: redis.Redis):
"""每个 Agent 只写自己的命名空间,完全消除写竞争。"""
key = AGENT_NAMESPACES[agent_id]
r.set(key, json.dumps(output)) # 无需锁:只有一个 Agent 写这个 key
def aggregate_outputs(r: redis.Redis) -> dict:
"""Aggregator 在所有 Agent 完成后读取各自的结果并汇总,此时没有并发写入。"""
results = {}
for agent_id, key in AGENT_NAMESPACES.items():
raw = r.get(key)
if raw:
results[agent_id] = json.loads(raw)
return results
Step 3:LangGraph 并行节点使用 Reducer 函数合并状态
from typing import Annotated
import operator
from langgraph.graph import StateGraph
# LangGraph 的 Reducer:并行节点各自返回 list,框架自动 merge 而不是覆盖
class WorkflowState(TypedDict):
# 用 operator.add 作为 reducer:并行节点的 messages 列表会被合并,不会覆盖
messages: Annotated[list, operator.add]
# 如果需要自定义 merge 逻辑
conclusions: Annotated[list[dict], lambda a, b: a + b]
# 并行节点只需返回自己的增量,不需要读取当前完整状态
def agent_a_node(state: WorkflowState) -> dict:
return {"conclusions": [{"agent": "a", "result": "..."}]} # 只返回增量
def agent_b_node(state: WorkflowState) -> dict:
return {"conclusions": [{"agent": "b", "result": "..."}]} # 只返回增量
# LangGraph 会自动把两个节点的 conclusions 合并
Step 4:为共享字典操作加 asyncio.Lock
import asyncio
class SharedMemory:
"""线程/协程安全的共享内存,所有读写通过 lock 保护。"""
def __init__(self):
self._data: dict = {}
self._lock = asyncio.Lock()
async def update(self, key: str, value) -> None:
async with self._lock:
self._data[key] = value
async def append_to_list(self, key: str, item) -> None:
async with self._lock:
self._data.setdefault(key, [])
self._data[key].append(item)
async def get(self, key: str, default=None):
async with self._lock:
return self._data.get(key, default)
async def atomic_update(self, key: str, updater):
"""读-改-写的原子操作。"""
async with self._lock:
current = self._data.get(key)
new_value = updater(current)
self._data[key] = new_value
return new_value
预防建议
- 设计多 Agent 系统时,优先使用「每个 Agent 写独立命名空间」的分区模型,消除写竞争,而不是用锁管理共享写入
- LangGraph 的并行节点状态合并必须使用 Annotated Reducer,不要在并行节点里直接读写完整的 state
- Redis 的共享状态更新必须用
WATCH+MULTI+EXEC事务,不允许裸的GET + SET - asyncio 场景下,两个
await之间的复合操作(读-判断-写)必须放在asyncio.Lock保护的临界区里 - 在代码 review 时,对所有对共享变量的写操作打「并发安全」标签,reviewer 重点检查这些位置
- 为共享内存的更新操作写并发测试(如用
asyncio.gather同时启动 100 个 writer),验证没有数据丢失 - 定期审计共享内存的写入模式,识别高竞争的 key,考虑将其拆分或转为分区模型
常见问答 (FAQ)
Q: Python 的 GIL 是否保护了 dict 的并发写入?
A: GIL 保证单个字节码操作的原子性,d[k] = v 这样的简单赋值是安全的。但「读-判断-写」的复合操作(如 if k not in d: d[k] = v)不是原子的——GIL 可能在 in 检查和 d[k] = v 之间被另一个线程拿走。在 asyncio 里没有 GIL 保护(协程是协作调度的),await 点就是可能被「抢占」的切换点。
Q: LangGraph 的 Reducer 和手动加锁有什么本质区别? A: Reducer 是无锁的设计:每个并行节点返回自己的增量(而不是完整状态),框架在 fan-in 节点串行地合并所有增量。因为没有并发写入,所以不需要锁。手动加锁是有锁的设计:允许并发写入,但通过互斥保证串行化。Reducer 方案在性能上更优(没有锁争用),在正确性上也更容易推理。
Q: Redis 的 Lua 脚本和 WATCH + MULTI + EXEC 哪个更推荐用于乐观锁?
A: Redis Lua 脚本更推荐:Lua 脚本在 Redis 服务器上原子执行,不存在网络往返导致的时间窗口问题,也不需要客户端重试逻辑。WATCH + MULTI + EXEC 需要客户端在 WatchError 时重试,增加了编程复杂度。对于 CAS 操作,用 Lua 脚本实现一次往返完成比 3 次往返(WATCH + 读 + MULTI/SET/EXEC)更高效。
Q: 如何检测生产环境里是否正在发生共享内存写竞争? A: 在写操作里记录版本号(每次成功写入递增),同时记录「写入时读到的版本号」和「实际写入的版本号」。如果两者之间有间隙(如读到版本 5,写入时已经是版本 7),说明有 2 次其他 Agent 的写入被你读到的旧版本覆盖了。这个间隙就是竞态的直接证据。