你的 AutoGen 系统里有 5 个并行 Agent,其中「数据分析」Agent 在处理大数据集时在 60 秒内发出了 800 次 GPT-4o API 请求,触发了 OpenAI 的 RPM(每分钟请求数)限制。由于所有 Agent 共用同一个 OPENAI_API_KEY,其他 4 个 Agent 也开始收到 429 错误,重试逻辑叠加,整个系统每分钟的请求数从 800 涨到 2000(全部是重试),rate limit 持续时间从原本的 2 分钟变成了 30 分钟。Anthropic 的限额是按 organization + model + tier 维度的,LangGraph 的多并发 workflow 共享同一个 token 配额,任何一个 workflow 的爆发都会影响所有正在运行的 workflow。
常见原因
1. 所有 Agent 共用同一个 API key,限额是全局的
OpenAI 和 Anthropic 的 rate limit 按 API key(或 organization)维度计算,不区分哪个 Agent 发出的请求。一个 Agent 用完限额,所有 Agent 都被限速。
怎么判断:查看代码里的 API key 配置,如果所有 Agent 用同一个 key,就存在这个风险。
2. 没有按 Agent 角色分配独立的请求配额
系统里的 high-priority Agent(如处理用户实时请求的 Agent)和 low-priority Agent(如离线批处理 Agent)共用同一个 rate limit 池。批处理 Agent 的爆发会影响实时 Agent 的响应延迟。
怎么判断:检查是否为不同优先级的 Agent 设置了独立的速率预算(如 high-priority 最多占用 70% 的 RPM,low-priority 最多 30%)。
3. 重试逻辑没有感知全局 rate limit 状态
每个 Agent 独立实现重试逻辑,触发 429 后各自等待 1 秒再重试。5 个 Agent 同时重试,全局请求速率翻倍,rate limit 持续时间成倍延长。这是「重试放大」效应。
怎么判断:统计 rate limit 发生后,系统的实际请求速率是否显著高于触发 rate limit 之前的速率。如果是,就是重试放大在起作用。
4. 缺乏全局限速协调器
每个 Agent 各自维护一个局部的 token bucket 或 leaky bucket,但它们之间没有协调。Agent A 的桶里还有配额,Agent B 的桶里也有配额,两者同时请求,合计超过了全局限额。
怎么判断:把所有 Agent 的「本地允许速率」加总,如果超过 API key 的实际限额,就没有全局协调。
5. 批处理任务没有被限速,而是全速发送
CrewAI 的某个 Task 需要对 500 条数据各调用一次 LLM,代码里用了 asyncio.gather(*tasks) 并发发送,瞬间把 500 个请求同时打出去,直接触发 rate limit。
怎么判断:检查批处理代码里是否有并发控制(如 asyncio.Semaphore),如果是裸的 asyncio.gather,就没有限速。
6. Exponential backoff 的 base 时间计算错误,退避不足
wait = 2 ** retry_count(秒),retry_count 从 0 开始,第一次重试等 1 秒(2^0 = 1),第二次等 2 秒(2^1 = 2)。但 OpenAI 的 rate limit reset 窗口是 60 秒,退避时间远小于 reset 时间,Agent 会在 reset 完成前持续打入大量无效请求。
怎么判断:检查退避计算公式,确认在 rate limit 持续期间,总重试次数乘以请求速率是否仍然超过限额。
最短修复路径
Step 1:实现全局共享的速率限制器
import asyncio
from collections import deque
from typing import ClassVar
class GlobalRateLimiter:
"""多 Agent 共享的全局速率限制器,防止任一 Agent 用完全局配额。"""
_instances: ClassVar[dict[str, "GlobalRateLimiter"]] = {}
def __new__(cls, api_key: str, rpm: int, tpm: int):
if api_key not in cls._instances:
instance = super().__new__(cls)
instance._init(rpm, tpm)
cls._instances[api_key] = instance
return cls._instances[api_key]
def _init(self, rpm: int, tpm: int):
self.rpm = rpm
self.tpm = tpm
self._request_times: deque = deque()
self._token_times: deque = deque()
self._lock = asyncio.Lock()
async def acquire(self, estimated_tokens: int = 1000):
async with self._lock:
now = asyncio.get_event_loop().time()
# 清理 60 秒前的记录
while self._request_times and now - self._request_times[0] > 60:
self._request_times.popleft()
while self._token_times and now - self._token_times[0][0] > 60:
self._token_times.popleft()
current_rpm = len(self._request_times)
current_tpm = sum(t for _, t in self._token_times)
# 如果接近限额,等待到窗口重置
if current_rpm >= self.rpm * 0.9: # 留 10% buffer
wait = 60 - (now - self._request_times[0])
await asyncio.sleep(wait + 0.5)
elif current_tpm + estimated_tokens >= self.tpm * 0.9:
wait = 60 - (now - self._token_times[0][0])
await asyncio.sleep(wait + 0.5)
self._request_times.append(asyncio.get_event_loop().time())
self._token_times.append((asyncio.get_event_loop().time(), estimated_tokens))
# 使用:所有 Agent 共享同一个限速器实例
limiter = GlobalRateLimiter(api_key=os.environ["OPENAI_API_KEY"], rpm=500, tpm=800000)
async def rate_limited_call(messages, estimated_tokens=2000):
await limiter.acquire(estimated_tokens)
return await openai_client.chat.completions.create(messages=messages)
Step 2:为不同优先级的 Agent 分配独立配额份额
class PriorityRateLimiter:
"""按优先级分配 rate limit 配额。"""
PRIORITY_ALLOCATION = {
"high": 0.7, # 高优先级占 70%(用户实时请求)
"low": 0.25, # 低优先级占 25%(离线批处理)
"system": 0.05 # 系统保留 5%(心跳、监控)
}
def get_priority_rpm(self, priority: str, total_rpm: int) -> int:
return int(total_rpm * self.PRIORITY_ALLOCATION.get(priority, 0.1))
Step 3:批处理任务加并发控制
async def batch_llm_calls(items: list, max_concurrent: int = 10):
"""批量 LLM 调用,用 Semaphore 控制并发数,防止瞬时爆发。"""
semaphore = asyncio.Semaphore(max_concurrent)
async def call_with_limit(item):
async with semaphore:
await asyncio.sleep(0.1) # 100ms 间隔,最多 10 req/s
return await rate_limited_call(item)
return await asyncio.gather(*[call_with_limit(item) for item in items])
Step 4:在 429 响应里读取 Retry-After,不要自行猜测等待时间
async def call_with_rate_limit_awareness(messages: list) -> dict:
"""遵守服务器指示的等待时间,而不是自行指数退避。"""
for attempt in range(5):
try:
return await openai_client.chat.completions.create(
model="gpt-4o", messages=messages
)
except openai.RateLimitError as e:
# 从响应头里读取实际的等待时间
retry_after = int(e.response.headers.get("retry-after", 60))
# 加 jitter 防止惊群
wait = retry_after + random.uniform(0, 5)
print(f"Rate limited,等待 {wait:.1f}s(attempt {attempt+1}/5)")
await asyncio.sleep(wait)
raise RuntimeError("Rate limit 重试 5 次后仍然失败")
Step 5:监控 rate limit 发生率,提前预警
# Prometheus 查询:过去 5 分钟的 429 错误率
rate(llm_api_errors_total{status="429"}[5m]) / rate(llm_api_calls_total[5m])
# 超过 5% 时告警
预防建议
- 所有 Agent 通过全局共享的速率限制器发出 API 请求,不允许各自独立限速
- 按业务优先级分配 rate limit 配额份额,实时业务优先级高于批处理业务
- 批处理任务用
asyncio.Semaphore控制并发,最大并发数根据 API key 的 RPM 限额计算 - 触发 429 时读取
Retry-After头,而不是自行猜测等待时间 - 为 rate limit 错误建立专用监控告警,429 比率超过 5% 时触发
- 考虑使用多个 API key(分属不同的 OpenAI organization),为不同优先级的 Agent 分配不同的 key
- 在 Anthropic/OpenAI 控制台申请更高的 rate limit 层级,不要用工程手段弥补配额不足的根本问题
常见问答 (FAQ)
Q: OpenAI 和 Anthropic 的 rate limit 分别是按什么维度计算的?
A: OpenAI 的 rate limit 按 organization + model 维度计算,同一 organization 下的所有 API key 共享限额。Anthropic 的 rate limit 按 API key(或 workspace)维度计算,不同 API key 的限额相互独立。在 Anthropic 上可以为不同的业务线创建不同的 workspace,实现配额隔离。
Q: 多 API key 方案会带来哪些管理复杂度? A: 主要是成本汇总和密钥轮换的复杂度。推荐用 LiteLLM proxy 统一管理多个 key,proxy 内部做 key 轮换和负载均衡,Agent 只看到一个 proxy endpoint。这样既实现了 key 隔离,又保持了管理简单性。
Q: 已经触发了 rate limit,如何最快恢复?
A: 立即停止所有自动重试(避免重试放大延长 rate limit 时间);等待 Retry-After 头指定的时间(通常 60s);恢复后先以 50% 的速率运行 5 分钟,确认稳定后再逐步恢复全速。不要在 rate limit 恢复后立即全速重试,否则可能立即再次触发。
Q: 如何预估一个新 workflow 上线后是否会触发 rate limit? A: 在 staging 环境用 10% 的流量运行新 workflow,测量它的 RPM 和 TPM 消耗。然后线性外推:全量上线后的预期消耗 = staging 测量值 × 10。如果预期消耗超过当前 API key 剩余配额的 80%,就需要申请更高配额或优化 workflow 的请求密度。