MySQL Checkpointing
MySQLCheckpointer is a production-grade persistence backend for
deployments standardized on MySQL. It mirrors
PostgresCheckpointer: aiomysql connection pool,
msgpack payloads, a queryable JSON metadata column, and a
host-managed schema verified on connect. Per-thread seq allocation is
serialized with a SELECT … FOR UPDATE row lock so multiple processes
can append to the same thread_id safely.
Requires MySQL 8.0.13+ (for JSON expression defaults) on the InnoDB engine.
Install the extra:
pip install "cubepi[mysql]"
This pulls in aiomysql, sqlalchemy, and msgpack.
Basic usage
import asyncio
from cubepi import Agent, Model
from cubepi.checkpointer import MySQLCheckpointer
from cubepi.providers.anthropic import AnthropicProvider
async def main():
provider = AnthropicProvider(api_key="…")
async with MySQLCheckpointer("mysql://user:pass@host:3306/dbname") as cp:
agent = Agent(
provider=provider,
model=Model(id="claude-sonnet-4-5-20250929", provider="anthropic"),
checkpointer=cp,
thread_id="user-42",
)
await agent.prompt("hello")
asyncio.run(main())
The connection is configured from a mysql://user:pass@host:port/db
DSN. Pool sizing:
async with MySQLCheckpointer(
"mysql://…",
min_pool_size=2,
max_pool_size=20,
) as cp:
…
Schema
The checkpointer expects three tables: cubepi_threads,
cubepi_messages, and cubepi_schema_version. Like Postgres (and
unlike SQLite), CubePi does not create these for you — it verifies
on __aenter__ that they exist with the expected schema_version.
If they're missing, you get CubepiSchemaUninitialized. If the version
doesn't match this CubePi release, you get CubepiSchemaMismatch. A
production database belongs to the host application's migration system
(Alembic, …), not to a third-party library.
Bootstrapping via Alembic
CubePi exposes the SQLAlchemy MetaData so your migrations can adopt
the schema:
# alembic/env.py
from cubepi.checkpointer.mysql import cubepi_metadata, EXPECTED_SCHEMA_VERSION
target_metadata = [my_app_metadata, cubepi_metadata]
Then autogenerate a revision:
alembic revision --autogenerate -m "add cubepi checkpointer"
The cubepi_messages table is KEY-partitioned, which SQLAlchemy's
autogenerate cannot express, so append the partition clause and the
schema-version write with the provided helpers:
# In a migration's upgrade():
from cubepi.checkpointer.mysql.alembic_helpers import (
messages_partition_clause,
write_schema_version_op,
)
def upgrade():
op.create_table("cubepi_threads", ...)
op.create_table(
"cubepi_messages", ...,
mysql_engine="InnoDB",
)
# KEY(thread_id) partitioning is not autogenerated — apply it explicitly:
op.execute(
"ALTER TABLE cubepi_messages " + messages_partition_clause()
)
op.create_table("cubepi_schema_version", ...)
# write_schema_version_op() returns two ';'-separated statements
# (DELETE then INSERT). MySQL/pymysql runs one statement per execute,
# so split and execute each:
for stmt in write_schema_version_op().split(";"):
if stmt.strip():
op.execute(stmt)
write_schema_version_op() is idempotent: it deletes any rows from a
prior CubePi version and inserts the current one. When CubePi later
bumps EXPECTED_SCHEMA_VERSION, generate a new revision and run it
again.
Data model
cubepi_threads
thread_id (PK) -- VARCHAR(255) utf8mb4_bin (case-sensitive)
parent_thread_id -- self-FK, for forks
forked_at_seq -- seq number at fork point
extra -- JSON
created_at / updated_at
cubepi_messages
thread_id, seq -- composite PK; partitioned by KEY(thread_id) into 64
role -- "user" | "assistant" | "tool"
metadata -- JSON (not indexed; see below)
payload -- LONGBLOB (msgpack)
created_at
cubepi_schema_version
version (PK)
Important properties:
(thread_id, seq)is the message identity.seqis monotonic per thread, allocated under aSELECT … FOR UPDATErow lock on the thread row. Two concurrent writers on the same thread serialize cleanly.payloadis msgpack-encodedmodel.model_dump(mode="json"). CubePi reconstructs the Pydantic model on read.metadatais JSON, queryable via MySQL JSON functions. It is not indexed — MySQL can't index a JSON column directly. If you need to query messages by metadata, add a generated-column index host-side.- Messages are partitioned by
KEY(thread_id)into 64 partitions. Even distribution, no per-thread bottleneck. The table has no foreign key tocubepi_threads(MySQL forbids FKs on partitioned tables); integrity is preserved by the lazy thread-row insert onappend.
Differences from the Postgres backend
| Postgres | MySQL | |
|---|---|---|
| Driver | asyncpg | aiomysql |
seq lock | pg_advisory_xact_lock | SELECT … FOR UPDATE |
| Messages partitioning | HASH(thread_id) + FK | KEY(thread_id), no FK |
metadata index | GIN | none (add a generated column) |
| Thread IDs | TEXT | VARCHAR(255) utf8mb4_bin |
| Min version | — | MySQL 8.0.13+, InnoDB |
save_extra semantics
save_extra does a shallow top-level merge, not a replace — the same
behavior as Postgres and SQLite. Writing {"foo": 1} then {"bar": 2}
leaves {"foo": 1, "bar": 2}. (Internally it reads the current extra
under a row lock and writes the merged dict, rather than using
JSON_MERGE_PATCH, whose null-deletion and deep-merge semantics differ
from dict.update.)
Common pitfalls
CubepiSchemaUninitialized— Your DB is empty, your migrations didn't run, or thecubepi_schema_versiontable is malformed. Apply the host alembic upgrade first.CubepiSchemaMismatch— You upgraded CubePi but didn't generate a new migration. Generate one, apply it, and CubePi will start.- JSON default errors on older MySQL — JSON columns use
DEFAULT (JSON_OBJECT()), which requires MySQL 8.0.13+. - Case-insensitive thread-ID collisions — Thread IDs use the
utf8mb4_bincollation soUserAanduserastay distinct. If you hand-roll the DDL, keep that collation.
See also
- Postgres Checkpointing — the Postgres-backed sibling.
- SQLite Checkpointing — single-process alternative.
- Custom Backends — Protocol details.
- Package README — the full host-integration runbook next to the code.