MySQL Checkpointing
MySQLCheckpointer 是面向 MySQL 部署的生产级持久化后端。它与 PostgresCheckpointer 保持一致:使用 aiomysql 连接池、msgpack 负载、可查询的 JSON metadata 列,以及由宿主管理、在连接时进行校验的 schema。每个 thread 的 seq 分配通过 SELECT … FOR UPDATE 行锁串行化,因此多个进程可以安全地向同一 thread_id 追加消息。
需要 MySQL 8.0.13+(支持 JSON 表达式默认值)及 InnoDB 引擎。
安装额外依赖:
pip install "cubepi[mysql]"
这会安装 aiomysql、sqlalchemy 和 msgpack。
基本用法
import asyncio
from cubepi import Agent, Model
from cubepi.checkpointer import MySQLCheckpointer
from cubepi.providers.anthropic import AnthropicProvider
async def main():
provider = AnthropicProvider(api_key="…")
async with MySQLCheckpointer("mysql://user:pass@host:3306/dbname") as cp:
agent = Agent(
provider=provider,
model=Model(id="claude-sonnet-4-5-20250929", provider="anthropic"),
checkpointer=cp,
thread_id="user-42",
)
await agent.prompt("hello")
asyncio.run(main())
连接通过 mysql://user:pass@host:port/db 格式的 DSN 配置。连接池大小:
async with MySQLCheckpointer(
"mysql://…",
min_pool_size=2,
max_pool_size=20,
) as cp:
…
Schema
Checkpointer 需要三张表:cubepi_threads、cubepi_messages 和 cubepi_schema_version。与 Postgres 一样(与 SQLite 不同),CubePi 不会为你创建这些表——它会在 __aenter__ 时验证这些表是否存在并具有预期的 schema_version。
若表不存在,会抛出 CubepiSchemaUninitialized;若版本与当前 CubePi 版本不匹配,会抛出 CubepiSchemaMismatch。生产数据库应由宿主应用的迁移系统(Alembic 等)管理,而非由第三方库管理。
通过 Alembic 初始化 Schema
CubePi 暴露了 SQLAlchemy 的 MetaData,供你的迁移脚本采用该 schema:
# alembic/env.py
from cubepi.checkpointer.mysql import cubepi_metadata, EXPECTED_SCHEMA_VERSION
target_metadata = [my_app_metadata, cubepi_metadata]
然后自动生成一个 revision:
alembic revision --autogenerate -m "add cubepi checkpointer"
cubepi_messages 表使用 KEY 分区,SQLAlchemy 的自动生成无法表达这一点,因此需要使用提供的辅助函数手动追加分区子句和 schema 版本写入:
# In a migration's upgrade():
from cubepi.checkpointer.mysql.alembic_helpers import (
messages_partition_clause,
write_schema_version_op,
)
def upgrade():
op.create_table("cubepi_threads", ...)
op.create_table(
"cubepi_messages", ...,
mysql_engine="InnoDB",
)
# KEY(thread_id) partitioning is not autogenerated — apply it explicitly:
op.execute(
"ALTER TABLE cubepi_messages " + messages_partition_clause()
)
op.create_table("cubepi_schema_version", ...)
# write_schema_version_op() returns two ';'-separated statements
# (DELETE then INSERT). MySQL/pymysql runs one statement per execute,
# so split and execute each:
for stmt in write_schema_version_op().split(";"):
if stmt.strip():
op.execute(stmt)
write_schema_version_op() 是幂等的:它会删除旧 CubePi 版本的记录行并插入当前版本。当 CubePi 之后更新 EXPECTED_SCHEMA_VERSION 时,生成新的 revision 并再次运行即可。
数据模型
cubepi_threads
thread_id (PK) -- VARCHAR(255) utf8mb4_bin(大小写敏感)
parent_thread_id -- 自引用外键,用于 fork
forked_at_seq -- fork 时的 seq 编号
extra -- JSON
created_at / updated_at
cubepi_messages
thread_id, seq -- 复合主键;按 KEY(thread_id) 分区为 64 个分区
role -- "user" | "assistant" | "tool"
metadata -- JSON(未索引,见下文)
payload -- LONGBLOB(msgpack)
created_at
cubepi_schema_version
version (PK)
重要特性:
(thread_id, seq)是消息的唯一标识。seq在每个 thread 内单调递增,在对 thread 行持有SELECT … FOR UPDATE行锁的情况下分配。同一 thread 的两个并发写入者可以干净地串行化。payload是 msgpack 编码的model.model_dump(mode="json")。 CubePi 在读取时重建 Pydantic 模型。metadata是 JSON,可通过 MySQL JSON 函数查询。 它未建立索引——MySQL 无法直接对 JSON 列建索引。若需要按 metadata 查询消息,请在宿主侧添加生成列索引。- 消息按
KEY(thread_id)分区为 64 个分区。 分布均匀,无 per-thread 瓶颈。该表对cubepi_threads没有外键(MySQL 禁止对分区表使用外键);完整性由append时的惰性 thread 行插入来保证。
与 Postgres 后端的差异
| Postgres | MySQL | |
|---|---|---|
| 驱动 | asyncpg | aiomysql |
seq 锁 | pg_advisory_xact_lock | SELECT … FOR UPDATE |
| 消息分区 | HASH(thread_id) + FK | KEY(thread_id),无 FK |
metadata 索引 | GIN | 无(添加生成列) |
| Thread ID | TEXT | VARCHAR(255) utf8mb4_bin |
| 最低版本 | — | MySQL 8.0.13+,InnoDB |
save_extra 语义
save_extra 执行浅层顶级合并,而非替换——与 Postgres 和 SQLite 的行为相同。先写入 {"foo": 1} 再写入 {"bar": 2} 后,结果为 {"foo": 1, "bar": 2}。(内部实现是在行锁保护下读取当前 extra,然后写入合并后的字典,而非使用 JSON_MERGE_PATCH——后者的 null 删除和深度合并语义与 dict.update 不同。)
常见陷阱
CubepiSchemaUninitialized—— 数据库为空、迁移未执行,或cubepi_schema_version表结构有误。请先应用宿主的 alembic upgrade。CubepiSchemaMismatch—— 升级了 CubePi 但未生成新的迁移。生成迁移、应用后 CubePi 即可启动。- 旧版 MySQL 上的 JSON 默认值错误 —— JSON 列使用
DEFAULT (JSON_OBJECT()),需要 MySQL 8.0.13+。 - 大小写不敏感的 thread ID 冲突 —— Thread ID 使用
utf8mb4_bin排序规则,因此UserA和usera保持区分。若手写 DDL,请保持该排序规则。
另请参阅
- Postgres Checkpointing —— Postgres 后端的对应文档。
- SQLite Checkpointing —— 单进程替代方案。
- Custom Backends —— Protocol 详情。
- Package README —— 与代码同处的完整宿主集成手册。