你的 LangGraph 流水线里,一个外部 API 工具开始以 30% 的概率返回 503,Agent 框架自动重试 3 次,每次重试都带上了完整的对话历史(已经有 8K token)。10 个并行 Agent 实例同时触发了重试,5 分钟内 API 调用量从每分钟 50 次暴涨到 400 次,OpenAI 的 rate limit 被触发,整条流水线崩溃。更常见的场景是:CrewAI 的工具调用因为网络抖动失败,Agent 在 30 秒内重试了 6 次,每次重试的 prompt 越来越长(包含了每次失败的错误信息),最终一次 LLM 调用用了 20K token。
常见原因
1. 重试没有 jitter(抖动),多个 Agent 同步重试
所有并行 Agent 同时遇到工具失败,同时触发重试,同时在 2 秒后再次请求——引发「惊群效应」,把原本偶发的失败变成持续的过载。
怎么判断:查看工具调用日志的时间戳,如果多个 Agent 的重试请求集中在同一秒,就是缺少 jitter。
2. 失败的错误信息被加入上下文,prompt 随重试次数增长
Agent 把工具调用失败的错误信息追加到对话历史里,下次重试时把这些错误信息也发给 LLM。重试 5 次后,prompt 里有 5 条错误信息,token 数大幅增加,成本放大。
怎么判断:比较第 1 次重试和第 5 次重试的 prompt token 数,如果后者显著大于前者,就是错误信息累积问题。
3. 重试次数没有全局上限,只有单次调用的重试上限
工具调用重试 3 次失败后,Agent 认为任务失败,重新规划,然后再次调用同一个工具,再次重试 3 次——逻辑上是「重试的重试」,总调用次数远超预期。
怎么判断:统计工具的总调用次数(包括所有重试),如果远超 max_retries 的设置,就是有嵌套重试。
4. 工具失败被框架自动重试,同时 Agent 也在手动重试
框架层(如 LangChain 的 RetryOutputParser)有自动重试,Agent 的逻辑里也有手动重试,两层重试叠加,实际重试次数是两者的乘积。
怎么判断:检查代码里是否同时存在框架级重试配置和业务代码里的 for i in range(max_retries) 重试循环。
5. 工具本身的问题不可通过重试解决
工具返回 400 Bad Request(客户端参数错误)或 403 Forbidden(权限不足),这类错误重试多少次都不会成功。但框架按照「所有错误都重试」的策略,持续触发无效请求。
怎么判断:检查重试失败的 HTTP 状态码。4xx 错误(除 429 外)通常是不可重试的,不应该触发重试。
6. 指数退避的上限设置过短
退避上限设为 10 秒,但工具的恢复时间通常需要 2-3 分钟。多次退避到上限后,Agent 仍然以 10 秒间隔持续请求,对已经过载的服务造成持续压力。
怎么判断:查看退避配置的 max_wait 参数,与工具服务的典型恢复时间对比。
最短修复路径
Step 1:实现带 jitter 的指数退避
import random, asyncio
from tenacity import retry, wait_exponential_jitter, stop_after_attempt, retry_if_exception_type
@retry(
wait=wait_exponential_jitter(initial=1, max=60, jitter=2), # jitter 防止惊群
stop=stop_after_attempt(3),
retry=retry_if_exception_type((ConnectionError, TimeoutError)), # 只重试网络错误
reraise=True
)
async def call_tool_with_retry(tool_fn, *args, **kwargs):
return await tool_fn(*args, **kwargs)
# 不重试 4xx 错误(客户端错误,重试无意义)
def should_retry(exception: Exception) -> bool:
if hasattr(exception, 'status_code'):
return exception.status_code not in (400, 401, 403, 404, 422)
return isinstance(exception, (ConnectionError, TimeoutError))
Step 2:实现熔断器,防止持续请求失败服务
from circuitbreaker import circuit
@circuit(
failure_threshold=5, # 连续 5 次失败后开启熔断
recovery_timeout=60, # 60 秒后尝试恢复(半开状态)
expected_exception=Exception
)
async def call_external_api(payload: dict) -> dict:
response = await http_client.post("/api/endpoint", json=payload)
response.raise_for_status()
return response.json()
# 熔断器开启时,调用立即抛出 CircuitBreakerError,不发起网络请求
# 这样就不会在服务已经故障时继续堆积请求
Step 3:工具失败时不把错误信息加入对话历史
def handle_tool_error(tool_name: str, error: Exception, state: dict) -> dict:
"""工具失败时,只记录简短的失败标记,不把完整错误堆栈加入上下文。"""
# 错误做法:把完整错误追加到消息历史(每次重试 token 数增加)
# state["messages"].append({"role": "tool", "content": str(error)})
# 正确做法:用专用的失败状态字段,不污染消息历史
state["tool_failures"] = state.get("tool_failures", {})
state["tool_failures"][tool_name] = {
"count": state["tool_failures"].get(tool_name, {}).get("count", 0) + 1,
"last_error": type(error).__name__, # 只记录错误类型,不记录完整信息
"last_attempt": datetime.utcnow().isoformat()
}
return state
Step 4:区分可重试错误和不可重试错误
class RetryPolicy:
RETRYABLE_STATUS = {429, 500, 502, 503, 504}
NON_RETRYABLE_STATUS = {400, 401, 403, 404, 422}
@classmethod
def should_retry(cls, error: Exception) -> bool:
if hasattr(error, 'response') and error.response is not None:
return error.response.status_code in cls.RETRYABLE_STATUS
return isinstance(error, (ConnectionError, TimeoutError, asyncio.TimeoutError))
@classmethod
def get_retry_after(cls, error: Exception) -> int:
"""从 429 响应的 Retry-After 头里读取等待时间。"""
if hasattr(error, 'response') and error.response:
retry_after = error.response.headers.get('Retry-After', '0')
return int(retry_after)
return 0
Step 5:为工具调用设置全局令牌桶限速
import asyncio
from collections import deque
class TokenBucket:
"""令牌桶限速器,控制工具调用的全局速率。"""
def __init__(self, rate: float, capacity: int):
self.rate = rate # 每秒补充令牌数
self.capacity = capacity
self.tokens = capacity
self.last_refill = asyncio.get_event_loop().time()
async def acquire(self):
now = asyncio.get_event_loop().time()
elapsed = now - self.last_refill
self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
self.last_refill = now
if self.tokens < 1:
wait_time = (1 - self.tokens) / self.rate
await asyncio.sleep(wait_time)
self.tokens = 0
else:
self.tokens -= 1
# 全局工具调用限速:最多每秒 10 次
tool_rate_limiter = TokenBucket(rate=10, capacity=10)
async def rate_limited_tool_call(tool_fn, *args, **kwargs):
await tool_rate_limiter.acquire()
return await tool_fn(*args, **kwargs)
预防建议
- 所有工具重试必须加 jitter(随机抖动),防止多 Agent 同步重试引发惊群效应
- 区分 4xx 和 5xx 错误的重试策略:4xx 大多数情况不应重试,5xx 和网络错误才重试
- 为每个工具设置全局调用速率上限(令牌桶),无论有多少 Agent 并行,都共用同一个速率限制
- 熔断器是必须的:工具连续失败 N 次后自动开启熔断,给下游服务恢复时间
- 工具失败信息不加入 LLM 对话历史,防止每次重试的 token 数递增
- 在 Prometheus 或 Datadog 里监控工具调用的重试率,超过 20% 时告警
- 为每个工具调用链路设置一个「最大总 token 预算」,超过预算直接失败而不是无限重试
常见问答 (FAQ)
Q: tenacity 的 wait_exponential_jitter 和普通的 wait_exponential 有多大区别?
A: 在单 Agent 场景差别不大。但在 10 个并行 Agent 同时失败时,wait_exponential 会让所有 Agent 在同一时刻(如 2 秒后)同时重试,形成请求波峰。wait_exponential_jitter 会在基础等待时间上加 ±jitter 秒的随机偏移,把 10 个重试请求分散到 2-4 秒的时间窗口里,显著降低峰值压力。
Q: 熔断器开启期间,依赖这个工具的 Agent 应该怎么处理? A: 三个选项:1)立即失败并通知用户「服务暂时不可用」;2)降级到备用工具或缓存结果;3)把任务放入等待队列,等熔断器恢复后继续处理。选哪个取决于业务的容错要求。推荐至少实现选项 1,防止任务无限等待。
Q: 如何区分「工具本身故障」和「Agent 调用参数错误」导致的失败? A: 通过 HTTP 状态码区分:5xx 是服务器错误(工具故障),4xx 是客户端错误(参数问题)。对于 400/422 错误,不应重试,应该分析参数错误并修复 prompt 或工具调用逻辑。记录工具调用的请求参数(脱敏后),方便后续分析参数错误的根因。
Q: 重试风暴已经发生,如何快速止损?
A: 立即执行:1)在负载均衡或 API 网关层启用全局限速,把到达后端的请求速率强制降到正常水平;2)如果在 LangGraph/Temporal,暂停所有 workflow 实例;3)检查重试代码,临时把 max_retries 改为 1 并重新部署。事后再修复根本原因。