Checkpoint 恢复出来的状态是损坏的

Agent 从 checkpoint 恢复时,加载出来的状态字段缺失、类型错误或值异常,导致 workflow 以错误状态继续执行,产生难以追踪的下游错误。本文分析 checkpoint 损坏的根因并给出写入和恢复的防御方案。

你的 LangGraph workflow 从 SQLite checkpoint 恢复后,state["processed_files"]None,而不是 34 条记录的列表——结果 Agent 重新处理了所有文件,产生了 34 条重复记录。或者在 Temporal 里,workflow 从持久化存储恢复后,一个 datetime 字段被反序列化成了字符串而不是 datetime 对象,后续的时间比较代码抛出了 TypeError,workflow 崩溃,陷入无限重启循环。Checkpoint 损坏不一定意味着文件被物理破坏——更常见的是序列化/反序列化逻辑与数据的实际类型不匹配,或者写入过程中的部分失败导致数据不一致。

常见原因

1. 序列化格式在代码升级时变了,但旧 checkpoint 没有迁移

代码从 pickle 改成了 json,或者自定义类的字段名从 processed_files 改成了 done_files,旧 checkpoint 按旧格式写入,新代码按新格式读取,字段找不到或类型不对。

怎么判断:读取最新的 checkpoint 原始内容(如读取 SQLite 数据库里的 blob),与当前代码期望的字段结构对比,检查字段名和类型是否一致。

2. 写入过程中进程崩溃,checkpoint 处于半写状态

Checkpoint 用了「先写后替换」的模式(写临时文件,完成后重命名),但进程在写临时文件的中途崩溃,临时文件存在但不完整。下次恢复时,框架加载了这个不完整的临时文件,读取到垃圾数据。

怎么判断:检查 checkpoint 目录里是否有 .tmp 后缀的文件(未完成的临时写入)。如果有,说明上次写入被中断了。

3. JSON 序列化丢失了 Python 特定类型

json.dumps()datetime(2026, 5, 25) 序列化成字符串 "2026-05-25T00:00:00"set({"a", "b"}) 序列化失败(JSON 不支持 set),Decimal("3.14") 序列化成浮点数 3.14(精度丢失)。恢复时这些类型需要手动转换,但很多代码没有做这个转换。

怎么判断:检查 AgentState 里的字段类型,如果有 datetimesetDecimalEnum,就需要自定义序列化器。

4. 并发写入导致 checkpoint 内容被部分覆盖

两个 Agent 同时更新同一个 checkpoint(如 thread_id="task-001"),后写入的 Agent 覆盖了前一个的改动,checkpoint 只保留了其中一个 Agent 的状态,另一个的改动丢失。

怎么判断:比较同一 thread_id 的 checkpoint 记录数,如果有多次写入,检查每次写入的内容是否完整包含了之前的所有状态。

5. 大型 checkpoint 被数据库的 blob 大小限制截断

SQLite 默认的 blob 大小限制是 1GB,但实际上某些框架会在 page size 层面有更低的限制。如果 checkpoint 包含大量文本(如完整的 LLM 对话历史),序列化后超过限制,写入被静默截断,读取时得到不完整的 JSON。

怎么判断:对比期望的 checkpoint 大小(序列化后的字节数)与实际写入的字节数。用 len(db.get_checkpoint(id)) vs len(serialized_state) 对比。

6. 恢复逻辑没有做完整性验证,直接使用损坏的状态

load_checkpoint() 成功返回了,但返回的对象里某些字段是 None 或默认值(因为序列化时字段丢失了)。调用方没有验证这些字段,直接使用 None 进行了后续计算,导致 NoneType 错误在运行时才暴露。

怎么判断:在恢复逻辑之后加断言,检查所有关键字段是否都有有效值。如果没有这类断言,就是盲目信任。

最短修复路径

Step 1:为 checkpoint 写入加完整性哈希

import hashlib, json

def write_checkpoint_with_integrity(path: str, state: dict):
    """写入 checkpoint 时附加内容哈希,恢复时校验完整性。"""
    payload = json.dumps(state, ensure_ascii=False, default=str, sort_keys=True)
    checksum = hashlib.sha256(payload.encode()).hexdigest()
    
    envelope = {
        "schema_version": 3,
        "checksum": checksum,
        "payload": payload
    }
    
    # 原子写入:先写 .tmp,完成后重命名
    tmp_path = path + ".tmp"
    with open(tmp_path, "w", encoding="utf-8") as f:
        json.dump(envelope, f)
    os.replace(tmp_path, path)  # 原子重命名

def load_checkpoint_with_integrity(path: str) -> dict:
    """恢复时校验哈希,哈希不匹配则拒绝加载。"""
    with open(path, "r", encoding="utf-8") as f:
        envelope = json.load(f)
    
    # 校验完整性
    expected_checksum = envelope["checksum"]
    actual_checksum = hashlib.sha256(envelope["payload"].encode()).hexdigest()
    
    if expected_checksum != actual_checksum:
        raise CorruptedCheckpoint(
            f"Checkpoint 完整性校验失败:期望 {expected_checksum[:8]},实际 {actual_checksum[:8]}"
        )
    
    return json.loads(envelope["payload"])

Step 2:自定义 JSON 序列化器,处理 Python 特定类型

from datetime import datetime, date
from decimal import Decimal
from enum import Enum
import json

class AgentStateEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, datetime):
            return {"__type__": "datetime", "value": obj.isoformat()}
        if isinstance(obj, date):
            return {"__type__": "date", "value": obj.isoformat()}
        if isinstance(obj, Decimal):
            return {"__type__": "decimal", "value": str(obj)}
        if isinstance(obj, set):
            return {"__type__": "set", "value": list(obj)}
        if isinstance(obj, Enum):
            return {"__type__": "enum", "class": obj.__class__.__name__, "value": obj.value}
        return super().default(obj)

def agent_state_decoder(obj: dict):
    if "__type__" not in obj:
        return obj
    t = obj["__type__"]
    if t == "datetime":
        return datetime.fromisoformat(obj["value"])
    if t == "date":
        return date.fromisoformat(obj["value"])
    if t == "decimal":
        return Decimal(obj["value"])
    if t == "set":
        return set(obj["value"])
    return obj

# 使用
serialized = json.dumps(state, cls=AgentStateEncoder)
restored = json.loads(serialized, object_hook=agent_state_decoder)

Step 3:schema 版本迁移

CURRENT_SCHEMA_VERSION = 3

def migrate_checkpoint(data: dict) -> dict:
    """把旧版本 checkpoint 迁移到当前版本。"""
    version = data.get("schema_version", 1)
    
    if version == 1:
        # v1 -> v2:字段重命名
        data["done_files"] = data.pop("processed_files", [])
        data["schema_version"] = 2
        version = 2
    
    if version == 2:
        # v2 -> v3:新增 created_at 字段,默认当前时间
        data.setdefault("created_at", datetime.utcnow().isoformat())
        data["schema_version"] = 3
        version = 3
    
    return data

Step 4:恢复后做业务层完整性检查

def restore_and_assert(checkpoint_path: str) -> AgentState:
    data = load_checkpoint_with_integrity(checkpoint_path)
    data = migrate_checkpoint(data)
    state = AgentState(**data)
    
    # 业务层断言
    assert isinstance(state.done_files, list), "done_files 必须是列表"
    assert 0 <= state.current_step <= state.total_steps, \
        f"current_step {state.current_step} 超出范围 [0, {state.total_steps}]"
    assert state.created_at is not None, "created_at 不能为 None"
    
    # 外部一致性检查:checkpoint 里的已处理 ID 在数据库里都存在
    for file_id in state.done_files:
        if not db.file_exists(file_id):
            raise InconsistentCheckpoint(
                f"checkpoint 记录 {file_id} 已处理,但数据库里不存在该记录"
            )
    
    return state

Step 5:保留 N 个历史版本的 checkpoint,支持回滚

# 保留最近 5 个 checkpoint 版本
ls -t checkpoints/task-001.json* | tail -n +6 | xargs rm -f

# 命名规范:checkpoint-{step}-{timestamp}.json
cp checkpoint.json "checkpoints/task-001-step${STEP}-$(date +%s).json"

预防建议

  • 所有 checkpoint 写入使用原子操作(tmp + rename),防止半写状态
  • 写入 checkpoint 时附加内容哈希,恢复时验证哈希,不匹配则拒绝加载
  • 为所有非基本类型(datetime、set、Enum、Decimal)实现自定义 JSON 序列化/反序列化
  • 每次 schema 变更时递增版本号,并在 migrate_checkpoint() 里实现向后兼容的迁移
  • 恢复后做两层校验:schema 校验(字段类型和范围)+ 业务层校验(与外部系统的一致性)
  • 保留最近 5 个历史 checkpoint,支持在当前 checkpoint 损坏时回滚到上一个版本
  • 在 staging 环境定期做混沌测试:随机截断 checkpoint 文件,验证恢复逻辑能正确检测并拒绝

常见问答 (FAQ)

Q: LangGraph 的内置 checkpoint 序列化是否已经处理了这些问题? A: LangGraph 的 SqliteSaverPostgresSaver 内部用 pickle 序列化,能正确处理大多数 Python 类型(包括 datetime 和 set)。但 pickle 有版本兼容性问题——Python 版本升级或依赖库更新后,旧的 pickle 数据可能无法反序列化。推荐在 pickle 之上加完整性哈希,并在重大版本升级后主动迁移 checkpoint 数据。

Q: 如何检测生产环境里是否有损坏的 checkpoint? A: 在 checkpoint 写入时记录元数据(写入时间、数据大小、关键字段值的摘要),恢复时对比元数据。可以写一个定期巡检脚本:加载所有活跃 checkpoint,验证完整性哈希和关键字段,发现异常时告警。

Q: Temporal 的 workflow state 也会损坏吗? A: Temporal 的持久化层(Cassandra/PostgreSQL)本身有数据完整性保证,物理损坏的概率很低。但序列化/反序列化问题在 Temporal 里仍然存在——如果你自定义了 DataConverter(序列化器),或者 Temporal SDK 版本升级了序列化格式,旧的 workflow 恢复时可能出现类型不匹配。推荐使用 Temporal 官方的 PayloadConverter 而不是完全自定义。

Q: Checkpoint 过大(超过 100MB)时应该怎么处理? A: 把 checkpoint 拆分成「元数据 checkpoint」和「数据 checkpoint」两部分:元数据 checkpoint 存小的状态变量(当前步骤、计数器等),数据 checkpoint 存大型数据(文件内容、LLM 历史等)。元数据 checkpoint 每步写一次,数据 checkpoint 按需写入(只在内容变化时)。这样可以把每步 checkpoint 的写入量从 100MB 降低到几KB。

相关阅读

标签: #AI 编程 #Agents #排查