重启后 Agent 状态对不上

Agent 重启或恢复后,内存中的状态与持久化存储的状态出现偏差,导致重复执行、跳过步骤或使用过期数据。本文拆解状态漂移根因并给出持久化与校验方案。

你的 LangGraph workflow 在第 15 步崩溃后重启,框架从 checkpoint 恢复,但 Agent 用的是重启前内存里的「已处理文件列表」,而 checkpoint 里记录的是另一个版本——两个来源的数据不一致,Agent 重新处理了已经完成的文件,又跳过了崩溃时正在处理的文件。Temporal 里也有类似情况:worker 崩溃重启后,workflow 状态从持久化存储恢复,但 activity 的本地缓存变量丢失了,导致 activity 用空值重跑。这类状态漂移问题的共同特征是:重启后「说自己记得」的状态和「实际发生过的」状态之间有缺口。

常见原因

1. 部分状态在内存里,部分在持久化存储里

最常见的根因。Agent 把「已处理的 ID 集合」存在 Python 的 set 对象里(重启后消失),同时把「总体进度」存在 Redis 里(重启后保留)。重启后,Redis 显示任务 70% 完成,但内存里的 ID 集合是空的,Agent 重新开始处理所有 ID。

怎么判断:列出所有状态变量,标记每个变量的存储位置(内存/文件/Redis/数据库)。如果有任何运行时需要的变量只在内存里,重启后就会丢失。

2. Checkpoint 写入时机不对(写后崩溃 or 崩溃前未写)

Checkpoint 在每 10 步写一次,但 Agent 在第 7 步崩溃。重启后恢复到第 0 步的 checkpoint,重新执行 7 步已经完成的工作,可能产生副作用(如重复发邮件、重复写数据库)。

怎么判断:查看最近一次 checkpoint 的时间戳,与崩溃时间对比,计算丢失的步骤数。如果步骤数大于 0,就是写入粒度太粗。

3. 外部资源的状态没有纳入 checkpoint

Agent 创建了一个临时文件 /tmp/work_state.json,重启后这个文件可能被系统清理,或者在另一台 worker 上根本不存在。Checkpoint 里只有「任务进行中」的标记,没有文件的内容。

怎么判断:检查 Agent 依赖的所有外部资源(文件、临时目录、数据库行、外部 API session),确认它们在 checkpoint 里有对应的记录或能被重建。

4. 状态版本不兼容

代码升级后,新版本的状态 schema 和旧版本的 checkpoint 数据格式不匹配。state["new_field"] 在旧 checkpoint 里不存在,Agent 拿到 NoneKeyError,然后用错误的默认值继续执行。

怎么判断:比较当前代码里 WorkflowState 的 schema 与最新 checkpoint 的实际字段列表,检查是否有新增字段在旧 checkpoint 里缺失。

5. 并发写入导致 checkpoint 内容不一致

多个线程或进程同时更新同一个 checkpoint,写操作没有加锁,导致部分字段来自一个写入者、部分字段来自另一个写入者,checkpoint 处于不一致状态。

怎么判断:检查 checkpoint 文件或数据库记录的 updated_at 时间戳,是否存在同一毫秒内的多次写入。

6. 重启后没有做状态校验就直接继续执行

Agent 盲目信任 checkpoint 里的状态,不做任何有效性验证就继续执行。如果 checkpoint 本身是损坏或过期的(如来自不同环境的 checkpoint 被意外加载),Agent 会用错误的状态跑完整个 workflow。

怎么判断:检查恢复逻辑里是否有 validate_state() 调用。如果没有,就是盲目信任。

最短修复路径

Step 1:把所有运行时状态统一到单一持久化 store

from dataclasses import dataclass, field
from typing import Any
import json, pathlib

@dataclass
class AgentState:
    """所有状态字段,必须可序列化。禁止在这个类之外保存运行时状态。"""
    task_id: str
    processed_ids: list[str] = field(default_factory=list)
    current_step: int = 0
    total_steps: int = 0
    last_checkpoint_at: str = ""
    schema_version: int = 2          # 每次修改 schema 时递增
    
    def save(self, path: str):
        pathlib.Path(path).write_text(
            json.dumps(self.__dict__, ensure_ascii=False, indent=2)
        )
    
    @classmethod
    def load(cls, path: str) -> "AgentState":
        data = json.loads(pathlib.Path(path).read_text())
        # 版本迁移
        if data.get("schema_version", 1) < 2:
            data["processed_ids"] = data.pop("done_ids", [])
            data["schema_version"] = 2
        return cls(**data)

Step 2:每步执行后立即写 checkpoint(而不是每 N 步)

async def run_step(state: AgentState, step_fn, checkpoint_path: str):
    """执行一步并立即持久化,保证崩溃后最多丢失 1 步。"""
    result = await step_fn(state)
    state.current_step += 1
    state.last_checkpoint_at = datetime.utcnow().isoformat()
    state.save(checkpoint_path)      # 立即写,不要批量
    return result

Step 3:恢复时做完整性校验

def restore_and_validate(checkpoint_path: str) -> AgentState:
    state = AgentState.load(checkpoint_path)
    
    # 1. schema 版本检查
    assert state.schema_version == CURRENT_SCHEMA_VERSION, \
        f"checkpoint schema 版本 {state.schema_version} 与当前代码 {CURRENT_SCHEMA_VERSION} 不匹配"
    
    # 2. 外部资源存在性检查
    for item_id in state.processed_ids:
        assert db.exists(item_id), f"checkpoint 记录 {item_id} 已处理,但数据库里找不到对应记录"
    
    # 3. 步骤范围检查
    assert 0 <= state.current_step <= state.total_steps, \
        f"current_step={state.current_step} 超出合法范围 [0, {state.total_steps}]"
    
    return state

Step 4:用事务性更新防止并发写入不一致

import filelock

def atomic_save(state: AgentState, path: str):
    """原子性写入:先写临时文件,再原子重命名,防止写到一半崩溃。"""
    tmp_path = path + ".tmp"
    lock = filelock.FileLock(path + ".lock")
    with lock:
        pathlib.Path(tmp_path).write_text(
            json.dumps(state.__dict__, ensure_ascii=False)
        )
        pathlib.Path(tmp_path).rename(path)  # 原子操作

Step 5:在 Temporal 里用 workflow 参数传递状态,不用 activity 局部变量

# 错误:把状态放在 activity 的局部变量里(重启后丢失)
# processed = set()  # activity 局部变量
# 
# 正确:状态通过 workflow 的返回值链传递
@workflow.defn
class ProcessWorkflow:
    @workflow.run
    async def run(self, input: ProcessInput) -> ProcessResult:
        processed_ids = []
        for item in input.items:
            result = await workflow.execute_activity(
                process_item,
                ProcessItemInput(item_id=item.id, already_done=processed_ids),
                schedule_to_close_timeout=timedelta(minutes=5)
            )
            processed_ids.append(item.id)  # 状态在 workflow 层累积,Temporal 自动持久化
        return ProcessResult(processed_ids=processed_ids)

预防建议

  • 禁止在 Agent 代码里使用模块级全局变量保存运行时状态,所有状态必须通过 AgentState 对象持久化
  • 每次修改 AgentState schema 时递增版本号,并在 load() 里实现向后兼容的迁移逻辑
  • 用「每步写 checkpoint」替代「每 N 步写 checkpoint」,N 越大丢失的工作越多
  • 为所有 checkpoint 写原子性操作(tmp + rename),防止写到一半崩溃导致损坏
  • 在恢复逻辑里加业务层校验(不只是 schema 校验),确认 checkpoint 描述的历史与外部系统的实际状态一致
  • 在 staging 环境定期做「混沌测试」:随机 kill Agent 进程,验证恢复后的行为是否正确
  • Temporal/Inngest 场景:所有跨 activity 的状态通过 workflow 返回值链传递,不依赖 worker 本地内存

常见问答 (FAQ)

Q: LangGraph 的 MemorySaverSqliteSaver 在重启场景下有什么区别? A: MemorySaver 把 checkpoint 存在进程内存里,进程重启后全部丢失,只适用于测试。SqliteSaver 把 checkpoint 写入 SQLite 文件,重启后可以恢复。生产环境应使用 PostgresSaverRedisSaver,支持多 worker 共享 checkpoint。

Q: 如果 checkpoint 和外部数据库状态不一致,应该以哪个为准? A: 以外部数据库为准。数据库是事实来源(source of truth),checkpoint 只是执行进度的记录。恢复时,先从数据库查询哪些 item 已经被处理(通过唯一 ID + 处理状态字段),重建 processed_ids 列表,而不是直接信任 checkpoint 里的 processed_ids

Q: schema_version 迁移代码需要保留多久? A: 需要保留到所有生产环境的 checkpoint 都已经升级到新版本。实践上,保留 2 个大版本的迁移代码是常见做法。如果运维上能确保重启前先清理旧 checkpoint,迁移代码可以更早删除。

Q: 如何检测生产环境里是否存在状态漂移? A: 在每次 workflow 完成时,运行一个对账任务:对比 checkpoint 记录的「已处理 ID 数量」与数据库里实际的「处理完成记录数量」。如果两个数字不一致,就存在状态漂移,需要人工介入调查。

相关阅读

标签: #AI 编程 #Agents #排查