跳到主要内容
版本:Next 🚧

MySQL Checkpointing

MySQLCheckpointer is a production-grade persistence backend for deployments standardized on MySQL. It mirrors PostgresCheckpointer: aiomysql connection pool, msgpack payloads, a queryable JSON metadata column, and a host-managed schema verified on connect. Per-thread seq allocation is serialized with a SELECT … FOR UPDATE row lock so multiple processes can append to the same thread_id safely.

Requires MySQL 8.0.13+ (for JSON expression defaults) on the InnoDB engine.

Install the extra:

pip install "cubepi[mysql]"

This pulls in aiomysql, sqlalchemy, and msgpack.

Basic usage

import asyncio
from cubepi import Agent, Model
from cubepi.checkpointer import MySQLCheckpointer
from cubepi.providers.anthropic import AnthropicProvider


async def main():
provider = AnthropicProvider(api_key="…")
async with MySQLCheckpointer("mysql://user:pass@host:3306/dbname") as cp:
agent = Agent(
provider=provider,
model=Model(id="claude-sonnet-4-5-20250929", provider="anthropic"),
checkpointer=cp,
thread_id="user-42",
)
await agent.prompt("hello")


asyncio.run(main())

The connection is configured from a mysql://user:pass@host:port/db DSN. Pool sizing:

async with MySQLCheckpointer(
"mysql://…",
min_pool_size=2,
max_pool_size=20,
) as cp:

Schema

The checkpointer expects three tables: cubepi_threads, cubepi_messages, and cubepi_schema_version. Like Postgres (and unlike SQLite), CubePi does not create these for you — it verifies on __aenter__ that they exist with the expected schema_version.

If they're missing, you get CubepiSchemaUninitialized. If the version doesn't match this CubePi release, you get CubepiSchemaMismatch. A production database belongs to the host application's migration system (Alembic, …), not to a third-party library.

Bootstrapping via Alembic

CubePi exposes the SQLAlchemy MetaData so your migrations can adopt the schema:

# alembic/env.py
from cubepi.checkpointer.mysql import cubepi_metadata, EXPECTED_SCHEMA_VERSION

target_metadata = [my_app_metadata, cubepi_metadata]

Then autogenerate a revision:

alembic revision --autogenerate -m "add cubepi checkpointer"

The cubepi_messages table is KEY-partitioned, which SQLAlchemy's autogenerate cannot express, so append the partition clause and the schema-version write with the provided helpers:

# In a migration's upgrade():
from cubepi.checkpointer.mysql.alembic_helpers import (
messages_partition_clause,
write_schema_version_op,
)

def upgrade():
op.create_table("cubepi_threads", ...)
op.create_table(
"cubepi_messages", ...,
mysql_engine="InnoDB",
)
# KEY(thread_id) partitioning is not autogenerated — apply it explicitly:
op.execute(
"ALTER TABLE cubepi_messages " + messages_partition_clause()
)
op.create_table("cubepi_schema_version", ...)
# write_schema_version_op() returns two ';'-separated statements
# (DELETE then INSERT). MySQL/pymysql runs one statement per execute,
# so split and execute each:
for stmt in write_schema_version_op().split(";"):
if stmt.strip():
op.execute(stmt)

write_schema_version_op() is idempotent: it deletes any rows from a prior CubePi version and inserts the current one. When CubePi later bumps EXPECTED_SCHEMA_VERSION, generate a new revision and run it again.

Data model

cubepi_threads
thread_id (PK) -- VARCHAR(255) utf8mb4_bin (case-sensitive)
parent_thread_id -- self-FK, for forks
forked_at_seq -- seq number at fork point
extra -- JSON
created_at / updated_at

cubepi_messages
thread_id, seq -- composite PK; partitioned by KEY(thread_id) into 64
role -- "user" | "assistant" | "tool"
metadata -- JSON (not indexed; see below)
payload -- LONGBLOB (msgpack)
created_at

cubepi_schema_version
version (PK)

Important properties:

  • (thread_id, seq) is the message identity. seq is monotonic per thread, allocated under a SELECT … FOR UPDATE row lock on the thread row. Two concurrent writers on the same thread serialize cleanly.
  • payload is msgpack-encoded model.model_dump(mode="json"). CubePi reconstructs the Pydantic model on read.
  • metadata is JSON, queryable via MySQL JSON functions. It is not indexed — MySQL can't index a JSON column directly. If you need to query messages by metadata, add a generated-column index host-side.
  • Messages are partitioned by KEY(thread_id) into 64 partitions. Even distribution, no per-thread bottleneck. The table has no foreign key to cubepi_threads (MySQL forbids FKs on partitioned tables); integrity is preserved by the lazy thread-row insert on append.

Differences from the Postgres backend

PostgresMySQL
Driverasyncpgaiomysql
seq lockpg_advisory_xact_lockSELECT … FOR UPDATE
Messages partitioningHASH(thread_id) + FKKEY(thread_id), no FK
metadata indexGINnone (add a generated column)
Thread IDsTEXTVARCHAR(255) utf8mb4_bin
Min versionMySQL 8.0.13+, InnoDB

save_extra semantics

save_extra does a shallow top-level merge, not a replace — the same behavior as Postgres and SQLite. Writing {"foo": 1} then {"bar": 2} leaves {"foo": 1, "bar": 2}. (Internally it reads the current extra under a row lock and writes the merged dict, rather than using JSON_MERGE_PATCH, whose null-deletion and deep-merge semantics differ from dict.update.)

Common pitfalls

  • CubepiSchemaUninitialized — Your DB is empty, your migrations didn't run, or the cubepi_schema_version table is malformed. Apply the host alembic upgrade first.
  • CubepiSchemaMismatch — You upgraded CubePi but didn't generate a new migration. Generate one, apply it, and CubePi will start.
  • JSON default errors on older MySQL — JSON columns use DEFAULT (JSON_OBJECT()), which requires MySQL 8.0.13+.
  • Case-insensitive thread-ID collisions — Thread IDs use the utf8mb4_bin collation so UserA and usera stay distinct. If you hand-roll the DDL, keep that collation.

See also