Agent 编排器互相等待出现死锁

多 Agent 系统中两个或多个 Agent 互相等待对方的输出,整个流水线挂起不前进。本文分析死锁成因并提供超时、优先级和拓扑检测方案。

你的 CrewAI 流水线已经静止了 20 分钟,没有任何日志输出。查看 Trace 发现:Research Agent 在等待 Validator Agent 确认数据格式,而 Validator Agent 在等待 Research Agent 先提供样例数据——两者都在挂起状态。或者在 AutoGen 的 GroupChat 里,两个 Coder 都在等对方先写共享接口定义,谁也不肯先动。这类循环依赖在 LangGraph、Temporal 的复杂工作流里同样容易出现,尤其当任务依赖关系是在运行时动态生成的时候。

常见原因

1. 依赖图存在循环

最直接的原因。Agent A 的 task 依赖 Agent B 的输出,Agent B 的 task 又依赖 Agent A 的输出。在静态 DAG 框架(LangGraph)里这会被拓扑排序检测到,但在动态生成任务依赖的场景里(如 AutoGen 的函数调用链),循环依赖很容易被忽视。

怎么判断:把所有 Agent 的输入依赖画成有向图,用拓扑排序检测是否有环。如果有环,就是死锁根因。

2. 资源锁的获取顺序不一致

Agent A 先获取文件锁 X 再请求锁 Y,Agent B 先获取锁 Y 再请求锁 X。这是经典的「哲学家就餐」问题在 Agent 编排里的复现。

怎么判断:记录每个 Agent 获取锁的顺序和时间戳。如果两个 Agent 持有对方需要的锁,且都在等待,就是锁顺序问题。

3. 消息通道满了导致阻塞

在使用有界队列的框架里(如自定义的 asyncio.Queue 或 Redis list),发送方填满了队列后阻塞等待消费者读取,而消费者本身也在等待发送方的另一条消息,形成双向阻塞。

怎么判断:检查消息队列的当前长度。如果发送方的队列满了(length == maxsize),且接收方也在等待,就是这个问题。

4. 同步调用嵌套在异步上下文里

async 函数里用 asyncio.run() 调用另一个协程,或者在事件循环线程里调用阻塞 I/O,导致事件循环被占用,其他等待中的协程无法推进。

怎么判断:用 asyncio.get_event_loop().is_running() 检查,或者用 py-spy 采样堆栈,看是否有协程在 asyncio.run() 里阻塞。

5. 人工审批节点没有超时

Temporal 或 Inngest 里的 Human-in-the-loop 节点等待人工确认,但没有设置超时。如果审批人没有响应,后续所有依赖这个节点输出的 Agent 都会无限等待。

怎么判断:检查 Human approval 节点的配置,是否有 timeoutdeadline 设置。

6. 双向 handoff 协议设计错误

两个 Agent 都实现了「先等对方发 ready 信号再开始」的协议,但没有规定谁先发。两者都在等「ready」信号,但都没有发出「ready」。

怎么判断:查看两个 Agent 的 handoff 协议代码,是否存在双方都在 wait_for_ready() 而没有 send_ready() 的逻辑。

最短修复路径

Step 1:加全局超时,避免无限等待

import asyncio
from contextlib import asynccontextmanager

@asynccontextmanager
async def agent_timeout(seconds: int, task_name: str):
    """对任何 Agent 调用加超时保护。"""
    try:
        async with asyncio.timeout(seconds):
            yield
    except asyncio.TimeoutError:
        raise DeadlockSuspected(
            f"任务 '{task_name}' 超过 {seconds}s 无响应,疑似死锁"
        )

# 使用
async with agent_timeout(300, "research_agent"):
    result = await research_agent.run(task)

Step 2:运行前做依赖图拓扑排序

from collections import defaultdict, deque

def detect_cycle(dependencies: dict[str, list[str]]) -> list[str]:
    """Kahn 算法检测有向图中的环,返回造成环的节点列表。"""
    in_degree = defaultdict(int)
    for node, deps in dependencies.items():
        for dep in deps:
            in_degree[node] += 1
    
    queue = deque([n for n in dependencies if in_degree[n] == 0])
    visited = 0
    
    while queue:
        node = queue.popleft()
        visited += 1
        for neighbor in dependencies.get(node, []):
            in_degree[neighbor] -= 1
            if in_degree[neighbor] == 0:
                queue.append(neighbor)
    
    if visited < len(dependencies):
        cycle_nodes = [n for n in dependencies if in_degree[n] > 0]
        return cycle_nodes
    return []

# 在 workflow 启动前检测
deps = {
    "research": ["validator"],  # research 等 validator
    "validator": ["research"],  # validator 等 research —— 这是环!
}
cycle = detect_cycle(deps)
if cycle:
    raise ValueError(f"依赖图有环,涉及节点:{cycle}")

Step 3:统一锁的获取顺序

import threading

# 定义全局锁的规范获取顺序(按名称排序)
LOCK_REGISTRY: dict[str, threading.Lock] = {}

def acquire_locks_in_order(*lock_names: str):
    """按规范顺序获取多个锁,防止死锁。"""
    sorted_names = sorted(lock_names)  # 所有人按字母序获取
    locks = [LOCK_REGISTRY.setdefault(name, threading.Lock()) for name in sorted_names]
    acquired = []
    try:
        for lock in locks:
            lock.acquire(timeout=10)
            acquired.append(lock)
        yield
    finally:
        for lock in reversed(acquired):
            lock.release()

Step 4:Temporal Human-approval 节点加 deadline

# Temporal workflow
from temporalio import workflow
from datetime import timedelta

@workflow.defn
class ApprovalWorkflow:
    @workflow.run
    async def run(self, request: ApprovalRequest) -> str:
        try:
            # 设置 72 小时审批超时
            result = await workflow.wait_condition(
                lambda: self._approved is not None,
                timeout=timedelta(hours=72)
            )
            return self._approved
        except asyncio.TimeoutError:
            # 超时后自动拒绝或走默认路径
            return "auto_rejected_timeout"

Step 5:用 asyncio.gather 打破双向等待

# 两个 Agent 互相等待时,改为并发启动,各自独立运行
results = await asyncio.gather(
    agent_a.run_without_waiting_for_b(),
    agent_b.run_without_waiting_for_a(),
    return_exceptions=True
)
# 两者都完成后再进行协调
coordinator.reconcile(results[0], results[1])

预防建议

  • 所有 workflow 启动前运行拓扑排序检测,有环直接拒绝启动
  • 为每个 Agent 调用设置显式超时(建议生产环境不超过 10 分钟),超时后触发告警而不是无限等待
  • 定义全局锁获取规范顺序(如按资源 ID 字母序),所有 Agent 必须遵守这个顺序
  • Human-in-the-loop 节点必须设置超时和默认降级路径,不能无限等待人工响应
  • 在 Trace 里监控「等待时间」指标,超过阈值(如 60 秒)时发出告警
  • 避免在 Agent 协议里设计双向握手(A 等 B 的 ready,B 也等 A 的 ready),改用单向触发(A 先启动,完成后通知 B)
  • 定期审查 Agent 依赖关系图,防止功能迭代中引入新的循环依赖

常见问答 (FAQ)

Q: LangGraph 会自动检测死锁吗? A: LangGraph 的 compile() 方法会对静态定义的 edge 做拓扑排序,能检测显式定义的循环依赖。但如果依赖关系是通过 conditional edge 在运行时动态决定的,LangGraph 无法提前检测。这种情况需要在代码层加超时保护。

Q: 死锁发生时如何不重启整个 workflow 就恢复? A: 如果用了 Temporal 或 LangGraph 的 checkpoint,可以识别出卡住的节点,手动发送一个「打破等待」的信号(如 Temporal 的 signal),让其中一个节点以空输入继续执行,然后在 coordinator 里做异常处理。没有 checkpoint 的框架只能强制终止并重启。

Q: asyncio 的死锁和线程死锁有什么区别? A: 线程死锁是多个线程互相持有对方需要的 mutex。asyncio 的死锁通常是协程互相 await 对方的 Future,或者在事件循环里调用了阻塞函数导致整个循环挂起。诊断工具不同:线程死锁用 threadingdeadlock 检测,asyncio 用 asyncio.get_event_loop().slow_callback_durationaiomonitor

Q: 如何区分死锁和系统只是很慢? A: 死锁的特征是 CPU 使用率为 0 且没有任何进展。用 ps auxtop 检查进程的 CPU 使用率:如果长时间接近 0%,且没有网络 I/O,就是死锁而不是慢。另一个方法是检查 asyncio 的运行协程数,死锁时会有多个协程都在等待 Future.result()

相关阅读

标签: #AI 编程 #Agents #排查