共享 memory 被多 Agent 写覆盖

多个 Agent 并发读写同一块共享 memory(如 Redis key、共享字典、消息历史),后写入的 Agent 覆盖了之前 Agent 的重要更新,导致状态不一致或数据丢失。本文分析写竞争根因并给出乐观锁和分区隔离方案。

你的 CrewAI 流水线里 3 个 Agent 共用一个 Redis key workflow:state 作为共享黑板(blackboard),每个 Agent 读取当前状态、添加自己的结论后写回。并发运行时,Agent A 和 Agent B 都在同一时刻读到了版本 5 的状态,Agent A 写入了自己的结论变成版本 6,然后 Agent B 也写入了自己的结论(基于版本 5 的旧内容),直接覆盖了 Agent A 的改动,版本仍然是 6 但 A 的结论消失了。或者在 AutoGen 的 GroupChat 里,多个 Coder Agent 共用一个内存 dict 保存「已修复的 bug 列表」,并发写入时字典的内部状态损坏(Python dict 在 CPython 里不是线程安全的)。

常见原因

1. 读-改-写操作没有原子性保护

Agent 的操作模式是「读取当前状态 → 在本地修改 → 写回」,但这三步之间没有任何互斥机制。两个 Agent 读到相同的旧状态,各自修改,分别写回,后写入者覆盖前写入者的改动。这是经典的「丢失更新」(Lost Update)问题。

怎么判断:查看共享状态的读写代码,是否有 GETSET(原子读写)、事务、或锁保护。如果是裸的 get + set,就存在丢失更新风险。

2. 使用了非线程安全的数据结构

Python 的 dict 在 CPython 里受 GIL 保护,单次 d[k] = v 操作是线程安全的,但「读取-判断-写入」的复合操作不是。如果用了 collections.defaultdictlist.append() 是安全的,但 l += [x] 不是)或自定义类,线程安全性需要额外验证。

怎么判断:确认共享数据结构的类型,查阅其线程安全性文档。对于 asyncio 场景,await 点之间是安全的,但两个 await 之间的复合操作仍然不安全。

3. 消息历史被多个 Agent 并发 append

LangGraph 的 MessagesState 里的 messages 列表被多个并行节点同时 append,两个节点各自读到长度为 10 的列表,各自写入一条消息后把长度为 11 的列表写回,其中一条消息被覆盖,最终列表长度仍然是 11 而不是 12。

怎么判断:在并行节点里统计写入 messages 的次数,与实际 messages 列表的增长量对比。如果增长量少于写入次数,就发生了覆盖。

4. Redis 的 WATCH/MULTI/EXEC 事务被绕过

代码使用了 Redis 事务(WATCH + MULTI + EXEC)来防止竞态,但某些更新路径(如错误处理分支)直接用了裸的 SET,绕过了事务保护。

怎么判断:搜索所有对共享 Redis key 的写入点,检查是否所有写入都在事务里,还是有部分路径直接 SET

5. 乐观锁的版本号检查没有在原子操作里

用「比较并交换」(CAS)模式防止竞态:读取版本号,修改数据,检查版本号没变才写入。但「检查版本号」和「写入」两步之间有时间窗口,在这个窗口里另一个 Agent 可以改变版本号,导致 CAS 失效。

怎么判断:检查 CAS 逻辑是否用了原子操作(Redis 的 WATCH + EXEC,或数据库的 UPDATE ... WHERE version=N AND ...),而不是两次独立的读写操作。

6. 内存分区不够细,多个 Agent 竞争同一个粒度的锁

所有 Agent 共用一个「全局写锁」,每次更新需要获取这个锁。粒度太粗导致并发度低(Agent 大量时间在等锁),但如果细化了锁的粒度,又容易出现锁的遗漏或竞态条件。

怎么判断:统计 Agent 等待锁的平均时间,如果超过业务容忍的延迟,锁的粒度可能需要细化。

最短修复路径

Step 1:用 Redis WATCH + MULTI + EXEC 实现乐观锁

import redis
import json

def atomic_update_state(r: redis.Redis, key: str, updater_fn, max_retries: int = 5):
    """乐观锁更新:如果版本在读写之间被修改,自动重试。"""
    for attempt in range(max_retries):
        with r.pipeline() as pipe:
            try:
                # WATCH 监视 key,如果在 EXEC 之前 key 被其他客户端修改,EXEC 会失败
                pipe.watch(key)
                
                # 读取当前状态
                raw = pipe.get(key)
                state = json.loads(raw) if raw else {}
                
                # 业务逻辑修改(在 pipeline 外执行,不受 WATCH 保护)
                new_state = updater_fn(state)
                
                # 开启事务,原子写入
                pipe.multi()
                pipe.set(key, json.dumps(new_state))
                pipe.execute()  # 如果 WATCH 的 key 被改动,这里会抛 WatchError
                return new_state
                
            except redis.WatchError:
                # 另一个 Agent 在我们读取后修改了 key,重试
                if attempt == max_retries - 1:
                    raise RuntimeError(f"乐观锁冲突:重试 {max_retries} 次后仍然失败")
                continue

# 使用:Agent A 添加自己的结论
def add_agent_conclusion(state: dict) -> dict:
    state.setdefault("conclusions", [])
    state["conclusions"].append({"agent": "agent_a", "result": "..."})
    return state

atomic_update_state(redis_client, "workflow:state", add_agent_conclusion)

Step 2:用分区(Namespace)隔离不同 Agent 的写入空间

# 每个 Agent 只写自己的命名空间,不写公共命名空间
# workflow:state -> 全局只读配置
# workflow:agent_a:output -> 只有 Agent A 写
# workflow:agent_b:output -> 只有 Agent B 写
# workflow:aggregated -> 由 Aggregator Agent 汇总

AGENT_NAMESPACES = {
    "agent_a": "workflow:agent_a:output",
    "agent_b": "workflow:agent_b:output",
    "agent_c": "workflow:agent_c:output",
}

def write_agent_output(agent_id: str, output: dict, r: redis.Redis):
    """每个 Agent 只写自己的命名空间,完全消除写竞争。"""
    key = AGENT_NAMESPACES[agent_id]
    r.set(key, json.dumps(output))  # 无需锁:只有一个 Agent 写这个 key

def aggregate_outputs(r: redis.Redis) -> dict:
    """Aggregator 在所有 Agent 完成后读取各自的结果并汇总,此时没有并发写入。"""
    results = {}
    for agent_id, key in AGENT_NAMESPACES.items():
        raw = r.get(key)
        if raw:
            results[agent_id] = json.loads(raw)
    return results

Step 3:LangGraph 并行节点使用 Reducer 函数合并状态

from typing import Annotated
import operator
from langgraph.graph import StateGraph

# LangGraph 的 Reducer:并行节点各自返回 list,框架自动 merge 而不是覆盖
class WorkflowState(TypedDict):
    # 用 operator.add 作为 reducer:并行节点的 messages 列表会被合并,不会覆盖
    messages: Annotated[list, operator.add]
    
    # 如果需要自定义 merge 逻辑
    conclusions: Annotated[list[dict], lambda a, b: a + b]

# 并行节点只需返回自己的增量,不需要读取当前完整状态
def agent_a_node(state: WorkflowState) -> dict:
    return {"conclusions": [{"agent": "a", "result": "..."}]}  # 只返回增量

def agent_b_node(state: WorkflowState) -> dict:
    return {"conclusions": [{"agent": "b", "result": "..."}]}  # 只返回增量

# LangGraph 会自动把两个节点的 conclusions 合并

Step 4:为共享字典操作加 asyncio.Lock

import asyncio

class SharedMemory:
    """线程/协程安全的共享内存,所有读写通过 lock 保护。"""
    
    def __init__(self):
        self._data: dict = {}
        self._lock = asyncio.Lock()
    
    async def update(self, key: str, value) -> None:
        async with self._lock:
            self._data[key] = value
    
    async def append_to_list(self, key: str, item) -> None:
        async with self._lock:
            self._data.setdefault(key, [])
            self._data[key].append(item)
    
    async def get(self, key: str, default=None):
        async with self._lock:
            return self._data.get(key, default)
    
    async def atomic_update(self, key: str, updater):
        """读-改-写的原子操作。"""
        async with self._lock:
            current = self._data.get(key)
            new_value = updater(current)
            self._data[key] = new_value
            return new_value

预防建议

  • 设计多 Agent 系统时,优先使用「每个 Agent 写独立命名空间」的分区模型,消除写竞争,而不是用锁管理共享写入
  • LangGraph 的并行节点状态合并必须使用 Annotated Reducer,不要在并行节点里直接读写完整的 state
  • Redis 的共享状态更新必须用 WATCH + MULTI + EXEC 事务,不允许裸的 GET + SET
  • asyncio 场景下,两个 await 之间的复合操作(读-判断-写)必须放在 asyncio.Lock 保护的临界区里
  • 在代码 review 时,对所有对共享变量的写操作打「并发安全」标签,reviewer 重点检查这些位置
  • 为共享内存的更新操作写并发测试(如用 asyncio.gather 同时启动 100 个 writer),验证没有数据丢失
  • 定期审计共享内存的写入模式,识别高竞争的 key,考虑将其拆分或转为分区模型

常见问答 (FAQ)

Q: Python 的 GIL 是否保护了 dict 的并发写入? A: GIL 保证单个字节码操作的原子性,d[k] = v 这样的简单赋值是安全的。但「读-判断-写」的复合操作(如 if k not in d: d[k] = v)不是原子的——GIL 可能在 in 检查和 d[k] = v 之间被另一个线程拿走。在 asyncio 里没有 GIL 保护(协程是协作调度的),await 点就是可能被「抢占」的切换点。

Q: LangGraph 的 Reducer 和手动加锁有什么本质区别? A: Reducer 是无锁的设计:每个并行节点返回自己的增量(而不是完整状态),框架在 fan-in 节点串行地合并所有增量。因为没有并发写入,所以不需要锁。手动加锁是有锁的设计:允许并发写入,但通过互斥保证串行化。Reducer 方案在性能上更优(没有锁争用),在正确性上也更容易推理。

Q: Redis 的 Lua 脚本和 WATCH + MULTI + EXEC 哪个更推荐用于乐观锁? A: Redis Lua 脚本更推荐:Lua 脚本在 Redis 服务器上原子执行,不存在网络往返导致的时间窗口问题,也不需要客户端重试逻辑。WATCH + MULTI + EXEC 需要客户端在 WatchError 时重试,增加了编程复杂度。对于 CAS 操作,用 Lua 脚本实现一次往返完成比 3 次往返(WATCH + 读 + MULTI/SET/EXEC)更高效。

Q: 如何检测生产环境里是否正在发生共享内存写竞争? A: 在写操作里记录版本号(每次成功写入递增),同时记录「写入时读到的版本号」和「实际写入的版本号」。如果两者之间有间隙(如读到版本 5,写入时已经是版本 7),说明有 2 次其他 Agent 的写入被你读到的旧版本覆盖了。这个间隙就是竞态的直接证据。

相关阅读

标签: #AI 编程 #Agents #排查