你的 LangGraph workflow 从 SQLite checkpoint 恢复后,state["processed_files"] 是 None,而不是 34 条记录的列表——结果 Agent 重新处理了所有文件,产生了 34 条重复记录。或者在 Temporal 里,workflow 从持久化存储恢复后,一个 datetime 字段被反序列化成了字符串而不是 datetime 对象,后续的时间比较代码抛出了 TypeError,workflow 崩溃,陷入无限重启循环。Checkpoint 损坏不一定意味着文件被物理破坏——更常见的是序列化/反序列化逻辑与数据的实际类型不匹配,或者写入过程中的部分失败导致数据不一致。
常见原因
1. 序列化格式在代码升级时变了,但旧 checkpoint 没有迁移
代码从 pickle 改成了 json,或者自定义类的字段名从 processed_files 改成了 done_files,旧 checkpoint 按旧格式写入,新代码按新格式读取,字段找不到或类型不对。
怎么判断:读取最新的 checkpoint 原始内容(如读取 SQLite 数据库里的 blob),与当前代码期望的字段结构对比,检查字段名和类型是否一致。
2. 写入过程中进程崩溃,checkpoint 处于半写状态
Checkpoint 用了「先写后替换」的模式(写临时文件,完成后重命名),但进程在写临时文件的中途崩溃,临时文件存在但不完整。下次恢复时,框架加载了这个不完整的临时文件,读取到垃圾数据。
怎么判断:检查 checkpoint 目录里是否有 .tmp 后缀的文件(未完成的临时写入)。如果有,说明上次写入被中断了。
3. JSON 序列化丢失了 Python 特定类型
json.dumps() 把 datetime(2026, 5, 25) 序列化成字符串 "2026-05-25T00:00:00",set({"a", "b"}) 序列化失败(JSON 不支持 set),Decimal("3.14") 序列化成浮点数 3.14(精度丢失)。恢复时这些类型需要手动转换,但很多代码没有做这个转换。
怎么判断:检查 AgentState 里的字段类型,如果有 datetime、set、Decimal、Enum,就需要自定义序列化器。
4. 并发写入导致 checkpoint 内容被部分覆盖
两个 Agent 同时更新同一个 checkpoint(如 thread_id="task-001"),后写入的 Agent 覆盖了前一个的改动,checkpoint 只保留了其中一个 Agent 的状态,另一个的改动丢失。
怎么判断:比较同一 thread_id 的 checkpoint 记录数,如果有多次写入,检查每次写入的内容是否完整包含了之前的所有状态。
5. 大型 checkpoint 被数据库的 blob 大小限制截断
SQLite 默认的 blob 大小限制是 1GB,但实际上某些框架会在 page size 层面有更低的限制。如果 checkpoint 包含大量文本(如完整的 LLM 对话历史),序列化后超过限制,写入被静默截断,读取时得到不完整的 JSON。
怎么判断:对比期望的 checkpoint 大小(序列化后的字节数)与实际写入的字节数。用 len(db.get_checkpoint(id)) vs len(serialized_state) 对比。
6. 恢复逻辑没有做完整性验证,直接使用损坏的状态
load_checkpoint() 成功返回了,但返回的对象里某些字段是 None 或默认值(因为序列化时字段丢失了)。调用方没有验证这些字段,直接使用 None 进行了后续计算,导致 NoneType 错误在运行时才暴露。
怎么判断:在恢复逻辑之后加断言,检查所有关键字段是否都有有效值。如果没有这类断言,就是盲目信任。
最短修复路径
Step 1:为 checkpoint 写入加完整性哈希
import hashlib, json
def write_checkpoint_with_integrity(path: str, state: dict):
"""写入 checkpoint 时附加内容哈希,恢复时校验完整性。"""
payload = json.dumps(state, ensure_ascii=False, default=str, sort_keys=True)
checksum = hashlib.sha256(payload.encode()).hexdigest()
envelope = {
"schema_version": 3,
"checksum": checksum,
"payload": payload
}
# 原子写入:先写 .tmp,完成后重命名
tmp_path = path + ".tmp"
with open(tmp_path, "w", encoding="utf-8") as f:
json.dump(envelope, f)
os.replace(tmp_path, path) # 原子重命名
def load_checkpoint_with_integrity(path: str) -> dict:
"""恢复时校验哈希,哈希不匹配则拒绝加载。"""
with open(path, "r", encoding="utf-8") as f:
envelope = json.load(f)
# 校验完整性
expected_checksum = envelope["checksum"]
actual_checksum = hashlib.sha256(envelope["payload"].encode()).hexdigest()
if expected_checksum != actual_checksum:
raise CorruptedCheckpoint(
f"Checkpoint 完整性校验失败:期望 {expected_checksum[:8]},实际 {actual_checksum[:8]}"
)
return json.loads(envelope["payload"])
Step 2:自定义 JSON 序列化器,处理 Python 特定类型
from datetime import datetime, date
from decimal import Decimal
from enum import Enum
import json
class AgentStateEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, datetime):
return {"__type__": "datetime", "value": obj.isoformat()}
if isinstance(obj, date):
return {"__type__": "date", "value": obj.isoformat()}
if isinstance(obj, Decimal):
return {"__type__": "decimal", "value": str(obj)}
if isinstance(obj, set):
return {"__type__": "set", "value": list(obj)}
if isinstance(obj, Enum):
return {"__type__": "enum", "class": obj.__class__.__name__, "value": obj.value}
return super().default(obj)
def agent_state_decoder(obj: dict):
if "__type__" not in obj:
return obj
t = obj["__type__"]
if t == "datetime":
return datetime.fromisoformat(obj["value"])
if t == "date":
return date.fromisoformat(obj["value"])
if t == "decimal":
return Decimal(obj["value"])
if t == "set":
return set(obj["value"])
return obj
# 使用
serialized = json.dumps(state, cls=AgentStateEncoder)
restored = json.loads(serialized, object_hook=agent_state_decoder)
Step 3:schema 版本迁移
CURRENT_SCHEMA_VERSION = 3
def migrate_checkpoint(data: dict) -> dict:
"""把旧版本 checkpoint 迁移到当前版本。"""
version = data.get("schema_version", 1)
if version == 1:
# v1 -> v2:字段重命名
data["done_files"] = data.pop("processed_files", [])
data["schema_version"] = 2
version = 2
if version == 2:
# v2 -> v3:新增 created_at 字段,默认当前时间
data.setdefault("created_at", datetime.utcnow().isoformat())
data["schema_version"] = 3
version = 3
return data
Step 4:恢复后做业务层完整性检查
def restore_and_assert(checkpoint_path: str) -> AgentState:
data = load_checkpoint_with_integrity(checkpoint_path)
data = migrate_checkpoint(data)
state = AgentState(**data)
# 业务层断言
assert isinstance(state.done_files, list), "done_files 必须是列表"
assert 0 <= state.current_step <= state.total_steps, \
f"current_step {state.current_step} 超出范围 [0, {state.total_steps}]"
assert state.created_at is not None, "created_at 不能为 None"
# 外部一致性检查:checkpoint 里的已处理 ID 在数据库里都存在
for file_id in state.done_files:
if not db.file_exists(file_id):
raise InconsistentCheckpoint(
f"checkpoint 记录 {file_id} 已处理,但数据库里不存在该记录"
)
return state
Step 5:保留 N 个历史版本的 checkpoint,支持回滚
# 保留最近 5 个 checkpoint 版本
ls -t checkpoints/task-001.json* | tail -n +6 | xargs rm -f
# 命名规范:checkpoint-{step}-{timestamp}.json
cp checkpoint.json "checkpoints/task-001-step${STEP}-$(date +%s).json"
预防建议
- 所有 checkpoint 写入使用原子操作(tmp + rename),防止半写状态
- 写入 checkpoint 时附加内容哈希,恢复时验证哈希,不匹配则拒绝加载
- 为所有非基本类型(datetime、set、Enum、Decimal)实现自定义 JSON 序列化/反序列化
- 每次 schema 变更时递增版本号,并在
migrate_checkpoint()里实现向后兼容的迁移 - 恢复后做两层校验:schema 校验(字段类型和范围)+ 业务层校验(与外部系统的一致性)
- 保留最近 5 个历史 checkpoint,支持在当前 checkpoint 损坏时回滚到上一个版本
- 在 staging 环境定期做混沌测试:随机截断 checkpoint 文件,验证恢复逻辑能正确检测并拒绝
常见问答 (FAQ)
Q: LangGraph 的内置 checkpoint 序列化是否已经处理了这些问题?
A: LangGraph 的 SqliteSaver 和 PostgresSaver 内部用 pickle 序列化,能正确处理大多数 Python 类型(包括 datetime 和 set)。但 pickle 有版本兼容性问题——Python 版本升级或依赖库更新后,旧的 pickle 数据可能无法反序列化。推荐在 pickle 之上加完整性哈希,并在重大版本升级后主动迁移 checkpoint 数据。
Q: 如何检测生产环境里是否有损坏的 checkpoint? A: 在 checkpoint 写入时记录元数据(写入时间、数据大小、关键字段值的摘要),恢复时对比元数据。可以写一个定期巡检脚本:加载所有活跃 checkpoint,验证完整性哈希和关键字段,发现异常时告警。
Q: Temporal 的 workflow state 也会损坏吗?
A: Temporal 的持久化层(Cassandra/PostgreSQL)本身有数据完整性保证,物理损坏的概率很低。但序列化/反序列化问题在 Temporal 里仍然存在——如果你自定义了 DataConverter(序列化器),或者 Temporal SDK 版本升级了序列化格式,旧的 workflow 恢复时可能出现类型不匹配。推荐使用 Temporal 官方的 PayloadConverter 而不是完全自定义。
Q: Checkpoint 过大(超过 100MB)时应该怎么处理? A: 把 checkpoint 拆分成「元数据 checkpoint」和「数据 checkpoint」两部分:元数据 checkpoint 存小的状态变量(当前步骤、计数器等),数据 checkpoint 存大型数据(文件内容、LLM 历史等)。元数据 checkpoint 每步写一次,数据 checkpoint 按需写入(只在内容变化时)。这样可以把每步 checkpoint 的写入量从 100MB 降低到几KB。