一个 Agent 触发 rate limit 拖垮整链

多 Agent 系统中某一个 Agent 的爆发性请求触发 API rate limit,导致共用同一 API key 的其他 Agent 全部被限速,整条流水线雪崩。本文分析级联失败的传导路径并给出隔离和限速方案。

你的 AutoGen 系统里有 5 个并行 Agent,其中「数据分析」Agent 在处理大数据集时在 60 秒内发出了 800 次 GPT-4o API 请求,触发了 OpenAI 的 RPM(每分钟请求数)限制。由于所有 Agent 共用同一个 OPENAI_API_KEY,其他 4 个 Agent 也开始收到 429 错误,重试逻辑叠加,整个系统每分钟的请求数从 800 涨到 2000(全部是重试),rate limit 持续时间从原本的 2 分钟变成了 30 分钟。Anthropic 的限额是按 organization + model + tier 维度的,LangGraph 的多并发 workflow 共享同一个 token 配额,任何一个 workflow 的爆发都会影响所有正在运行的 workflow。

常见原因

1. 所有 Agent 共用同一个 API key,限额是全局的

OpenAI 和 Anthropic 的 rate limit 按 API key(或 organization)维度计算,不区分哪个 Agent 发出的请求。一个 Agent 用完限额,所有 Agent 都被限速。

怎么判断:查看代码里的 API key 配置,如果所有 Agent 用同一个 key,就存在这个风险。

2. 没有按 Agent 角色分配独立的请求配额

系统里的 high-priority Agent(如处理用户实时请求的 Agent)和 low-priority Agent(如离线批处理 Agent)共用同一个 rate limit 池。批处理 Agent 的爆发会影响实时 Agent 的响应延迟。

怎么判断:检查是否为不同优先级的 Agent 设置了独立的速率预算(如 high-priority 最多占用 70% 的 RPM,low-priority 最多 30%)。

3. 重试逻辑没有感知全局 rate limit 状态

每个 Agent 独立实现重试逻辑,触发 429 后各自等待 1 秒再重试。5 个 Agent 同时重试,全局请求速率翻倍,rate limit 持续时间成倍延长。这是「重试放大」效应。

怎么判断:统计 rate limit 发生后,系统的实际请求速率是否显著高于触发 rate limit 之前的速率。如果是,就是重试放大在起作用。

4. 缺乏全局限速协调器

每个 Agent 各自维护一个局部的 token bucket 或 leaky bucket,但它们之间没有协调。Agent A 的桶里还有配额,Agent B 的桶里也有配额,两者同时请求,合计超过了全局限额。

怎么判断:把所有 Agent 的「本地允许速率」加总,如果超过 API key 的实际限额,就没有全局协调。

5. 批处理任务没有被限速,而是全速发送

CrewAI 的某个 Task 需要对 500 条数据各调用一次 LLM,代码里用了 asyncio.gather(*tasks) 并发发送,瞬间把 500 个请求同时打出去,直接触发 rate limit。

怎么判断:检查批处理代码里是否有并发控制(如 asyncio.Semaphore),如果是裸的 asyncio.gather,就没有限速。

6. Exponential backoff 的 base 时间计算错误,退避不足

wait = 2 ** retry_count(秒),retry_count 从 0 开始,第一次重试等 1 秒(2^0 = 1),第二次等 2 秒(2^1 = 2)。但 OpenAI 的 rate limit reset 窗口是 60 秒,退避时间远小于 reset 时间,Agent 会在 reset 完成前持续打入大量无效请求。

怎么判断:检查退避计算公式,确认在 rate limit 持续期间,总重试次数乘以请求速率是否仍然超过限额。

最短修复路径

Step 1:实现全局共享的速率限制器

import asyncio
from collections import deque
from typing import ClassVar

class GlobalRateLimiter:
    """多 Agent 共享的全局速率限制器,防止任一 Agent 用完全局配额。"""
    _instances: ClassVar[dict[str, "GlobalRateLimiter"]] = {}
    
    def __new__(cls, api_key: str, rpm: int, tpm: int):
        if api_key not in cls._instances:
            instance = super().__new__(cls)
            instance._init(rpm, tpm)
            cls._instances[api_key] = instance
        return cls._instances[api_key]
    
    def _init(self, rpm: int, tpm: int):
        self.rpm = rpm
        self.tpm = tpm
        self._request_times: deque = deque()
        self._token_times: deque = deque()
        self._lock = asyncio.Lock()
    
    async def acquire(self, estimated_tokens: int = 1000):
        async with self._lock:
            now = asyncio.get_event_loop().time()
            
            # 清理 60 秒前的记录
            while self._request_times and now - self._request_times[0] > 60:
                self._request_times.popleft()
            while self._token_times and now - self._token_times[0][0] > 60:
                self._token_times.popleft()
            
            current_rpm = len(self._request_times)
            current_tpm = sum(t for _, t in self._token_times)
            
            # 如果接近限额,等待到窗口重置
            if current_rpm >= self.rpm * 0.9:  # 留 10% buffer
                wait = 60 - (now - self._request_times[0])
                await asyncio.sleep(wait + 0.5)
            elif current_tpm + estimated_tokens >= self.tpm * 0.9:
                wait = 60 - (now - self._token_times[0][0])
                await asyncio.sleep(wait + 0.5)
            
            self._request_times.append(asyncio.get_event_loop().time())
            self._token_times.append((asyncio.get_event_loop().time(), estimated_tokens))

# 使用:所有 Agent 共享同一个限速器实例
limiter = GlobalRateLimiter(api_key=os.environ["OPENAI_API_KEY"], rpm=500, tpm=800000)

async def rate_limited_call(messages, estimated_tokens=2000):
    await limiter.acquire(estimated_tokens)
    return await openai_client.chat.completions.create(messages=messages)

Step 2:为不同优先级的 Agent 分配独立配额份额

class PriorityRateLimiter:
    """按优先级分配 rate limit 配额。"""
    PRIORITY_ALLOCATION = {
        "high": 0.7,    # 高优先级占 70%(用户实时请求)
        "low": 0.25,    # 低优先级占 25%(离线批处理)
        "system": 0.05  # 系统保留 5%(心跳、监控)
    }
    
    def get_priority_rpm(self, priority: str, total_rpm: int) -> int:
        return int(total_rpm * self.PRIORITY_ALLOCATION.get(priority, 0.1))

Step 3:批处理任务加并发控制

async def batch_llm_calls(items: list, max_concurrent: int = 10):
    """批量 LLM 调用,用 Semaphore 控制并发数,防止瞬时爆发。"""
    semaphore = asyncio.Semaphore(max_concurrent)
    
    async def call_with_limit(item):
        async with semaphore:
            await asyncio.sleep(0.1)  # 100ms 间隔,最多 10 req/s
            return await rate_limited_call(item)
    
    return await asyncio.gather(*[call_with_limit(item) for item in items])

Step 4:在 429 响应里读取 Retry-After,不要自行猜测等待时间

async def call_with_rate_limit_awareness(messages: list) -> dict:
    """遵守服务器指示的等待时间,而不是自行指数退避。"""
    for attempt in range(5):
        try:
            return await openai_client.chat.completions.create(
                model="gpt-4o", messages=messages
            )
        except openai.RateLimitError as e:
            # 从响应头里读取实际的等待时间
            retry_after = int(e.response.headers.get("retry-after", 60))
            # 加 jitter 防止惊群
            wait = retry_after + random.uniform(0, 5)
            print(f"Rate limited,等待 {wait:.1f}s(attempt {attempt+1}/5)")
            await asyncio.sleep(wait)
    raise RuntimeError("Rate limit 重试 5 次后仍然失败")

Step 5:监控 rate limit 发生率,提前预警

# Prometheus 查询:过去 5 分钟的 429 错误率
rate(llm_api_errors_total{status="429"}[5m]) / rate(llm_api_calls_total[5m])
# 超过 5% 时告警

预防建议

  • 所有 Agent 通过全局共享的速率限制器发出 API 请求,不允许各自独立限速
  • 按业务优先级分配 rate limit 配额份额,实时业务优先级高于批处理业务
  • 批处理任务用 asyncio.Semaphore 控制并发,最大并发数根据 API key 的 RPM 限额计算
  • 触发 429 时读取 Retry-After 头,而不是自行猜测等待时间
  • 为 rate limit 错误建立专用监控告警,429 比率超过 5% 时触发
  • 考虑使用多个 API key(分属不同的 OpenAI organization),为不同优先级的 Agent 分配不同的 key
  • 在 Anthropic/OpenAI 控制台申请更高的 rate limit 层级,不要用工程手段弥补配额不足的根本问题

常见问答 (FAQ)

Q: OpenAI 和 Anthropic 的 rate limit 分别是按什么维度计算的? A: OpenAI 的 rate limit 按 organization + model 维度计算,同一 organization 下的所有 API key 共享限额。Anthropic 的 rate limit 按 API key(或 workspace)维度计算,不同 API key 的限额相互独立。在 Anthropic 上可以为不同的业务线创建不同的 workspace,实现配额隔离。

Q: 多 API key 方案会带来哪些管理复杂度? A: 主要是成本汇总和密钥轮换的复杂度。推荐用 LiteLLM proxy 统一管理多个 key,proxy 内部做 key 轮换和负载均衡,Agent 只看到一个 proxy endpoint。这样既实现了 key 隔离,又保持了管理简单性。

Q: 已经触发了 rate limit,如何最快恢复? A: 立即停止所有自动重试(避免重试放大延长 rate limit 时间);等待 Retry-After 头指定的时间(通常 60s);恢复后先以 50% 的速率运行 5 分钟,确认稳定后再逐步恢复全速。不要在 rate limit 恢复后立即全速重试,否则可能立即再次触发。

Q: 如何预估一个新 workflow 上线后是否会触发 rate limit? A: 在 staging 环境用 10% 的流量运行新 workflow,测量它的 RPM 和 TPM 消耗。然后线性外推:全量上线后的预期消耗 = staging 测量值 × 10。如果预期消耗超过当前 API key 剩余配额的 80%,就需要申请更高配额或优化 workflow 的请求密度。

相关阅读

标签: #AI 编程 #Agents #排查