跳到主要内容
版本:0.6

MySQL Checkpointing

MySQLCheckpointer 是面向 MySQL 部署的生产级持久化后端。它与 PostgresCheckpointer 保持一致:使用 aiomysql 连接池、msgpack 负载、可查询的 JSON metadata 列,以及由宿主管理、在连接时进行校验的 schema。每个 thread 的 seq 分配通过 SELECT … FOR UPDATE 行锁串行化,因此多个进程可以安全地向同一 thread_id 追加消息。

需要 MySQL 8.0.13+(支持 JSON 表达式默认值)及 InnoDB 引擎。

安装额外依赖:

pip install "cubepi[mysql]"

这会安装 aiomysqlsqlalchemymsgpack

基本用法

import asyncio
from cubepi import Agent, Model
from cubepi.checkpointer import MySQLCheckpointer
from cubepi.providers.anthropic import AnthropicProvider


async def main():
provider = AnthropicProvider(api_key="…")
async with MySQLCheckpointer("mysql://user:pass@host:3306/dbname") as cp:
agent = Agent(
provider=provider,
model=Model(id="claude-sonnet-4-5-20250929", provider="anthropic"),
checkpointer=cp,
thread_id="user-42",
)
await agent.prompt("hello")


asyncio.run(main())

连接通过 mysql://user:pass@host:port/db 格式的 DSN 配置。连接池大小:

async with MySQLCheckpointer(
"mysql://…",
min_pool_size=2,
max_pool_size=20,
) as cp:

Schema

Checkpointer 需要三张表:cubepi_threadscubepi_messagescubepi_schema_version。与 Postgres 一样(与 SQLite 不同),CubePi 不会为你创建这些表——它会在 __aenter__ 时验证这些表是否存在并具有预期的 schema_version

若表不存在,会抛出 CubepiSchemaUninitialized;若版本与当前 CubePi 版本不匹配,会抛出 CubepiSchemaMismatch。生产数据库应由宿主应用的迁移系统(Alembic 等)管理,而非由第三方库管理。

通过 Alembic 初始化 Schema

CubePi 暴露了 SQLAlchemy 的 MetaData,供你的迁移脚本采用该 schema:

# alembic/env.py
from cubepi.checkpointer.mysql import cubepi_metadata, EXPECTED_SCHEMA_VERSION

target_metadata = [my_app_metadata, cubepi_metadata]

然后自动生成一个 revision:

alembic revision --autogenerate -m "add cubepi checkpointer"

cubepi_messages 表使用 KEY 分区,SQLAlchemy 的自动生成无法表达这一点,因此需要使用提供的辅助函数手动追加分区子句和 schema 版本写入:

# In a migration's upgrade():
from cubepi.checkpointer.mysql.alembic_helpers import (
messages_partition_clause,
write_schema_version_op,
)

def upgrade():
op.create_table("cubepi_threads", ...)
op.create_table(
"cubepi_messages", ...,
mysql_engine="InnoDB",
)
# KEY(thread_id) partitioning is not autogenerated — apply it explicitly:
op.execute(
"ALTER TABLE cubepi_messages " + messages_partition_clause()
)
op.create_table("cubepi_schema_version", ...)
# write_schema_version_op() returns two ';'-separated statements
# (DELETE then INSERT). MySQL/pymysql runs one statement per execute,
# so split and execute each:
for stmt in write_schema_version_op().split(";"):
if stmt.strip():
op.execute(stmt)

write_schema_version_op() 是幂等的:它会删除旧 CubePi 版本的记录行并插入当前版本。当 CubePi 之后更新 EXPECTED_SCHEMA_VERSION 时,生成新的 revision 并再次运行即可。

数据模型

cubepi_threads
thread_id (PK) -- VARCHAR(255) utf8mb4_bin(大小写敏感)
parent_thread_id -- 自引用外键,用于 fork
forked_at_seq -- fork 时的 seq 编号
extra -- JSON
created_at / updated_at

cubepi_messages
thread_id, seq -- 复合主键;按 KEY(thread_id) 分区为 64 个分区
role -- "user" | "assistant" | "tool"
metadata -- JSON(未索引,见下文)
payload -- LONGBLOB(msgpack)
created_at

cubepi_schema_version
version (PK)

重要特性:

  • (thread_id, seq) 是消息的唯一标识。 seq 在每个 thread 内单调递增,在对 thread 行持有 SELECT … FOR UPDATE 行锁的情况下分配。同一 thread 的两个并发写入者可以干净地串行化。
  • payload 是 msgpack 编码的 model.model_dump(mode="json") CubePi 在读取时重建 Pydantic 模型。
  • metadata 是 JSON,可通过 MySQL JSON 函数查询。建立索引——MySQL 无法直接对 JSON 列建索引。若需要按 metadata 查询消息,请在宿主侧添加生成列索引。
  • 消息按 KEY(thread_id) 分区为 64 个分区。 分布均匀,无 per-thread 瓶颈。该表对 cubepi_threads 没有外键(MySQL 禁止对分区表使用外键);完整性由 append 时的惰性 thread 行插入来保证。

与 Postgres 后端的差异

PostgresMySQL
驱动asyncpgaiomysql
seqpg_advisory_xact_lockSELECT … FOR UPDATE
消息分区HASH(thread_id) + FKKEY(thread_id),无 FK
metadata 索引GIN无(添加生成列)
Thread IDTEXTVARCHAR(255) utf8mb4_bin
最低版本MySQL 8.0.13+,InnoDB

save_extra 语义

save_extra 执行浅层顶级合并,而非替换——与 Postgres 和 SQLite 的行为相同。先写入 {"foo": 1} 再写入 {"bar": 2} 后,结果为 {"foo": 1, "bar": 2}。(内部实现是在行锁保护下读取当前 extra,然后写入合并后的字典,而非使用 JSON_MERGE_PATCH——后者的 null 删除和深度合并语义与 dict.update 不同。)

常见陷阱

  • CubepiSchemaUninitialized —— 数据库为空、迁移未执行,或 cubepi_schema_version 表结构有误。请先应用宿主的 alembic upgrade。
  • CubepiSchemaMismatch —— 升级了 CubePi 但未生成新的迁移。生成迁移、应用后 CubePi 即可启动。
  • 旧版 MySQL 上的 JSON 默认值错误 —— JSON 列使用 DEFAULT (JSON_OBJECT()),需要 MySQL 8.0.13+。
  • 大小写不敏感的 thread ID 冲突 —— Thread ID 使用 utf8mb4_bin 排序规则,因此 UserAusera 保持区分。若手写 DDL,请保持该排序规则。

另请参阅