不稳定 tool 触发 Agent 重试风暴

一个偶发失败的工具调用触发 Agent 框架的自动重试机制,指数退避叠加并行 Agent,导致 API 用量和成本在分钟内暴涨。本文分析重试风暴的放大机制并给出熔断和限速方案。

你的 LangGraph 流水线里,一个外部 API 工具开始以 30% 的概率返回 503,Agent 框架自动重试 3 次,每次重试都带上了完整的对话历史(已经有 8K token)。10 个并行 Agent 实例同时触发了重试,5 分钟内 API 调用量从每分钟 50 次暴涨到 400 次,OpenAI 的 rate limit 被触发,整条流水线崩溃。更常见的场景是:CrewAI 的工具调用因为网络抖动失败,Agent 在 30 秒内重试了 6 次,每次重试的 prompt 越来越长(包含了每次失败的错误信息),最终一次 LLM 调用用了 20K token。

常见原因

1. 重试没有 jitter(抖动),多个 Agent 同步重试

所有并行 Agent 同时遇到工具失败,同时触发重试,同时在 2 秒后再次请求——引发「惊群效应」,把原本偶发的失败变成持续的过载。

怎么判断:查看工具调用日志的时间戳,如果多个 Agent 的重试请求集中在同一秒,就是缺少 jitter。

2. 失败的错误信息被加入上下文,prompt 随重试次数增长

Agent 把工具调用失败的错误信息追加到对话历史里,下次重试时把这些错误信息也发给 LLM。重试 5 次后,prompt 里有 5 条错误信息,token 数大幅增加,成本放大。

怎么判断:比较第 1 次重试和第 5 次重试的 prompt token 数,如果后者显著大于前者,就是错误信息累积问题。

3. 重试次数没有全局上限,只有单次调用的重试上限

工具调用重试 3 次失败后,Agent 认为任务失败,重新规划,然后再次调用同一个工具,再次重试 3 次——逻辑上是「重试的重试」,总调用次数远超预期。

怎么判断:统计工具的总调用次数(包括所有重试),如果远超 max_retries 的设置,就是有嵌套重试。

4. 工具失败被框架自动重试,同时 Agent 也在手动重试

框架层(如 LangChain 的 RetryOutputParser)有自动重试,Agent 的逻辑里也有手动重试,两层重试叠加,实际重试次数是两者的乘积。

怎么判断:检查代码里是否同时存在框架级重试配置和业务代码里的 for i in range(max_retries) 重试循环。

5. 工具本身的问题不可通过重试解决

工具返回 400 Bad Request(客户端参数错误)或 403 Forbidden(权限不足),这类错误重试多少次都不会成功。但框架按照「所有错误都重试」的策略,持续触发无效请求。

怎么判断:检查重试失败的 HTTP 状态码。4xx 错误(除 429 外)通常是不可重试的,不应该触发重试。

6. 指数退避的上限设置过短

退避上限设为 10 秒,但工具的恢复时间通常需要 2-3 分钟。多次退避到上限后,Agent 仍然以 10 秒间隔持续请求,对已经过载的服务造成持续压力。

怎么判断:查看退避配置的 max_wait 参数,与工具服务的典型恢复时间对比。

最短修复路径

Step 1:实现带 jitter 的指数退避

import random, asyncio
from tenacity import retry, wait_exponential_jitter, stop_after_attempt, retry_if_exception_type

@retry(
    wait=wait_exponential_jitter(initial=1, max=60, jitter=2),  # jitter 防止惊群
    stop=stop_after_attempt(3),
    retry=retry_if_exception_type((ConnectionError, TimeoutError)),  # 只重试网络错误
    reraise=True
)
async def call_tool_with_retry(tool_fn, *args, **kwargs):
    return await tool_fn(*args, **kwargs)

# 不重试 4xx 错误(客户端错误,重试无意义)
def should_retry(exception: Exception) -> bool:
    if hasattr(exception, 'status_code'):
        return exception.status_code not in (400, 401, 403, 404, 422)
    return isinstance(exception, (ConnectionError, TimeoutError))

Step 2:实现熔断器,防止持续请求失败服务

from circuitbreaker import circuit

@circuit(
    failure_threshold=5,       # 连续 5 次失败后开启熔断
    recovery_timeout=60,       # 60 秒后尝试恢复(半开状态)
    expected_exception=Exception
)
async def call_external_api(payload: dict) -> dict:
    response = await http_client.post("/api/endpoint", json=payload)
    response.raise_for_status()
    return response.json()

# 熔断器开启时,调用立即抛出 CircuitBreakerError,不发起网络请求
# 这样就不会在服务已经故障时继续堆积请求

Step 3:工具失败时不把错误信息加入对话历史

def handle_tool_error(tool_name: str, error: Exception, state: dict) -> dict:
    """工具失败时,只记录简短的失败标记,不把完整错误堆栈加入上下文。"""
    # 错误做法:把完整错误追加到消息历史(每次重试 token 数增加)
    # state["messages"].append({"role": "tool", "content": str(error)})
    
    # 正确做法:用专用的失败状态字段,不污染消息历史
    state["tool_failures"] = state.get("tool_failures", {})
    state["tool_failures"][tool_name] = {
        "count": state["tool_failures"].get(tool_name, {}).get("count", 0) + 1,
        "last_error": type(error).__name__,  # 只记录错误类型,不记录完整信息
        "last_attempt": datetime.utcnow().isoformat()
    }
    return state

Step 4:区分可重试错误和不可重试错误

class RetryPolicy:
    RETRYABLE_STATUS = {429, 500, 502, 503, 504}
    NON_RETRYABLE_STATUS = {400, 401, 403, 404, 422}
    
    @classmethod
    def should_retry(cls, error: Exception) -> bool:
        if hasattr(error, 'response') and error.response is not None:
            return error.response.status_code in cls.RETRYABLE_STATUS
        return isinstance(error, (ConnectionError, TimeoutError, asyncio.TimeoutError))
    
    @classmethod
    def get_retry_after(cls, error: Exception) -> int:
        """从 429 响应的 Retry-After 头里读取等待时间。"""
        if hasattr(error, 'response') and error.response:
            retry_after = error.response.headers.get('Retry-After', '0')
            return int(retry_after)
        return 0

Step 5:为工具调用设置全局令牌桶限速

import asyncio
from collections import deque

class TokenBucket:
    """令牌桶限速器,控制工具调用的全局速率。"""
    def __init__(self, rate: float, capacity: int):
        self.rate = rate        # 每秒补充令牌数
        self.capacity = capacity
        self.tokens = capacity
        self.last_refill = asyncio.get_event_loop().time()
    
    async def acquire(self):
        now = asyncio.get_event_loop().time()
        elapsed = now - self.last_refill
        self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
        self.last_refill = now
        
        if self.tokens < 1:
            wait_time = (1 - self.tokens) / self.rate
            await asyncio.sleep(wait_time)
            self.tokens = 0
        else:
            self.tokens -= 1

# 全局工具调用限速:最多每秒 10 次
tool_rate_limiter = TokenBucket(rate=10, capacity=10)

async def rate_limited_tool_call(tool_fn, *args, **kwargs):
    await tool_rate_limiter.acquire()
    return await tool_fn(*args, **kwargs)

预防建议

  • 所有工具重试必须加 jitter(随机抖动),防止多 Agent 同步重试引发惊群效应
  • 区分 4xx 和 5xx 错误的重试策略:4xx 大多数情况不应重试,5xx 和网络错误才重试
  • 为每个工具设置全局调用速率上限(令牌桶),无论有多少 Agent 并行,都共用同一个速率限制
  • 熔断器是必须的:工具连续失败 N 次后自动开启熔断,给下游服务恢复时间
  • 工具失败信息不加入 LLM 对话历史,防止每次重试的 token 数递增
  • 在 Prometheus 或 Datadog 里监控工具调用的重试率,超过 20% 时告警
  • 为每个工具调用链路设置一个「最大总 token 预算」,超过预算直接失败而不是无限重试

常见问答 (FAQ)

Q: tenacitywait_exponential_jitter 和普通的 wait_exponential 有多大区别? A: 在单 Agent 场景差别不大。但在 10 个并行 Agent 同时失败时,wait_exponential 会让所有 Agent 在同一时刻(如 2 秒后)同时重试,形成请求波峰。wait_exponential_jitter 会在基础等待时间上加 ±jitter 秒的随机偏移,把 10 个重试请求分散到 2-4 秒的时间窗口里,显著降低峰值压力。

Q: 熔断器开启期间,依赖这个工具的 Agent 应该怎么处理? A: 三个选项:1)立即失败并通知用户「服务暂时不可用」;2)降级到备用工具或缓存结果;3)把任务放入等待队列,等熔断器恢复后继续处理。选哪个取决于业务的容错要求。推荐至少实现选项 1,防止任务无限等待。

Q: 如何区分「工具本身故障」和「Agent 调用参数错误」导致的失败? A: 通过 HTTP 状态码区分:5xx 是服务器错误(工具故障),4xx 是客户端错误(参数问题)。对于 400/422 错误,不应重试,应该分析参数错误并修复 prompt 或工具调用逻辑。记录工具调用的请求参数(脱敏后),方便后续分析参数错误的根因。

Q: 重试风暴已经发生,如何快速止损? A: 立即执行:1)在负载均衡或 API 网关层启用全局限速,把到达后端的请求速率强制降到正常水平;2)如果在 LangGraph/Temporal,暂停所有 workflow 实例;3)检查重试代码,临时把 max_retries 改为 1 并重新部署。事后再修复根本原因。

相关阅读

标签: #AI 编程 #Agents #排查