你的 CrewAI 流水线已经静止了 20 分钟,没有任何日志输出。查看 Trace 发现:Research Agent 在等待 Validator Agent 确认数据格式,而 Validator Agent 在等待 Research Agent 先提供样例数据——两者都在挂起状态。或者在 AutoGen 的 GroupChat 里,两个 Coder 都在等对方先写共享接口定义,谁也不肯先动。这类循环依赖在 LangGraph、Temporal 的复杂工作流里同样容易出现,尤其当任务依赖关系是在运行时动态生成的时候。
常见原因
1. 依赖图存在循环
最直接的原因。Agent A 的 task 依赖 Agent B 的输出,Agent B 的 task 又依赖 Agent A 的输出。在静态 DAG 框架(LangGraph)里这会被拓扑排序检测到,但在动态生成任务依赖的场景里(如 AutoGen 的函数调用链),循环依赖很容易被忽视。
怎么判断:把所有 Agent 的输入依赖画成有向图,用拓扑排序检测是否有环。如果有环,就是死锁根因。
2. 资源锁的获取顺序不一致
Agent A 先获取文件锁 X 再请求锁 Y,Agent B 先获取锁 Y 再请求锁 X。这是经典的「哲学家就餐」问题在 Agent 编排里的复现。
怎么判断:记录每个 Agent 获取锁的顺序和时间戳。如果两个 Agent 持有对方需要的锁,且都在等待,就是锁顺序问题。
3. 消息通道满了导致阻塞
在使用有界队列的框架里(如自定义的 asyncio.Queue 或 Redis list),发送方填满了队列后阻塞等待消费者读取,而消费者本身也在等待发送方的另一条消息,形成双向阻塞。
怎么判断:检查消息队列的当前长度。如果发送方的队列满了(length == maxsize),且接收方也在等待,就是这个问题。
4. 同步调用嵌套在异步上下文里
在 async 函数里用 asyncio.run() 调用另一个协程,或者在事件循环线程里调用阻塞 I/O,导致事件循环被占用,其他等待中的协程无法推进。
怎么判断:用 asyncio.get_event_loop().is_running() 检查,或者用 py-spy 采样堆栈,看是否有协程在 asyncio.run() 里阻塞。
5. 人工审批节点没有超时
Temporal 或 Inngest 里的 Human-in-the-loop 节点等待人工确认,但没有设置超时。如果审批人没有响应,后续所有依赖这个节点输出的 Agent 都会无限等待。
怎么判断:检查 Human approval 节点的配置,是否有 timeout 或 deadline 设置。
6. 双向 handoff 协议设计错误
两个 Agent 都实现了「先等对方发 ready 信号再开始」的协议,但没有规定谁先发。两者都在等「ready」信号,但都没有发出「ready」。
怎么判断:查看两个 Agent 的 handoff 协议代码,是否存在双方都在 wait_for_ready() 而没有 send_ready() 的逻辑。
最短修复路径
Step 1:加全局超时,避免无限等待
import asyncio
from contextlib import asynccontextmanager
@asynccontextmanager
async def agent_timeout(seconds: int, task_name: str):
"""对任何 Agent 调用加超时保护。"""
try:
async with asyncio.timeout(seconds):
yield
except asyncio.TimeoutError:
raise DeadlockSuspected(
f"任务 '{task_name}' 超过 {seconds}s 无响应,疑似死锁"
)
# 使用
async with agent_timeout(300, "research_agent"):
result = await research_agent.run(task)
Step 2:运行前做依赖图拓扑排序
from collections import defaultdict, deque
def detect_cycle(dependencies: dict[str, list[str]]) -> list[str]:
"""Kahn 算法检测有向图中的环,返回造成环的节点列表。"""
in_degree = defaultdict(int)
for node, deps in dependencies.items():
for dep in deps:
in_degree[node] += 1
queue = deque([n for n in dependencies if in_degree[n] == 0])
visited = 0
while queue:
node = queue.popleft()
visited += 1
for neighbor in dependencies.get(node, []):
in_degree[neighbor] -= 1
if in_degree[neighbor] == 0:
queue.append(neighbor)
if visited < len(dependencies):
cycle_nodes = [n for n in dependencies if in_degree[n] > 0]
return cycle_nodes
return []
# 在 workflow 启动前检测
deps = {
"research": ["validator"], # research 等 validator
"validator": ["research"], # validator 等 research —— 这是环!
}
cycle = detect_cycle(deps)
if cycle:
raise ValueError(f"依赖图有环,涉及节点:{cycle}")
Step 3:统一锁的获取顺序
import threading
# 定义全局锁的规范获取顺序(按名称排序)
LOCK_REGISTRY: dict[str, threading.Lock] = {}
def acquire_locks_in_order(*lock_names: str):
"""按规范顺序获取多个锁,防止死锁。"""
sorted_names = sorted(lock_names) # 所有人按字母序获取
locks = [LOCK_REGISTRY.setdefault(name, threading.Lock()) for name in sorted_names]
acquired = []
try:
for lock in locks:
lock.acquire(timeout=10)
acquired.append(lock)
yield
finally:
for lock in reversed(acquired):
lock.release()
Step 4:Temporal Human-approval 节点加 deadline
# Temporal workflow
from temporalio import workflow
from datetime import timedelta
@workflow.defn
class ApprovalWorkflow:
@workflow.run
async def run(self, request: ApprovalRequest) -> str:
try:
# 设置 72 小时审批超时
result = await workflow.wait_condition(
lambda: self._approved is not None,
timeout=timedelta(hours=72)
)
return self._approved
except asyncio.TimeoutError:
# 超时后自动拒绝或走默认路径
return "auto_rejected_timeout"
Step 5:用 asyncio.gather 打破双向等待
# 两个 Agent 互相等待时,改为并发启动,各自独立运行
results = await asyncio.gather(
agent_a.run_without_waiting_for_b(),
agent_b.run_without_waiting_for_a(),
return_exceptions=True
)
# 两者都完成后再进行协调
coordinator.reconcile(results[0], results[1])
预防建议
- 所有 workflow 启动前运行拓扑排序检测,有环直接拒绝启动
- 为每个 Agent 调用设置显式超时(建议生产环境不超过 10 分钟),超时后触发告警而不是无限等待
- 定义全局锁获取规范顺序(如按资源 ID 字母序),所有 Agent 必须遵守这个顺序
- Human-in-the-loop 节点必须设置超时和默认降级路径,不能无限等待人工响应
- 在 Trace 里监控「等待时间」指标,超过阈值(如 60 秒)时发出告警
- 避免在 Agent 协议里设计双向握手(A 等 B 的 ready,B 也等 A 的 ready),改用单向触发(A 先启动,完成后通知 B)
- 定期审查 Agent 依赖关系图,防止功能迭代中引入新的循环依赖
常见问答 (FAQ)
Q: LangGraph 会自动检测死锁吗?
A: LangGraph 的 compile() 方法会对静态定义的 edge 做拓扑排序,能检测显式定义的循环依赖。但如果依赖关系是通过 conditional edge 在运行时动态决定的,LangGraph 无法提前检测。这种情况需要在代码层加超时保护。
Q: 死锁发生时如何不重启整个 workflow 就恢复?
A: 如果用了 Temporal 或 LangGraph 的 checkpoint,可以识别出卡住的节点,手动发送一个「打破等待」的信号(如 Temporal 的 signal),让其中一个节点以空输入继续执行,然后在 coordinator 里做异常处理。没有 checkpoint 的框架只能强制终止并重启。
Q: asyncio 的死锁和线程死锁有什么区别?
A: 线程死锁是多个线程互相持有对方需要的 mutex。asyncio 的死锁通常是协程互相 await 对方的 Future,或者在事件循环里调用了阻塞函数导致整个循环挂起。诊断工具不同:线程死锁用 threading 的 deadlock 检测,asyncio 用 asyncio.get_event_loop().slow_callback_duration 或 aiomonitor。
Q: 如何区分死锁和系统只是很慢?
A: 死锁的特征是 CPU 使用率为 0 且没有任何进展。用 ps aux 或 top 检查进程的 CPU 使用率:如果长时间接近 0%,且没有网络 I/O,就是死锁而不是慢。另一个方法是检查 asyncio 的运行协程数,死锁时会有多个协程都在等待 Future.result()。