Postgres Checkpointing
PostgresCheckpointer is the production-grade persistence backend.
It uses asyncpg with a connection pool, msgpack for payloads, and
a per-thread Postgres advisory lock so multiple processes can write
the same thread_id without trampling each other.
Install the extra:
pip install "cubepi[postgres]"
This pulls in asyncpg, sqlalchemy, and msgpack.
Basic usageâ
import asyncio
from cubepi import Agent, Model
from cubepi.checkpointer import PostgresCheckpointer
from cubepi.providers.anthropic import AnthropicProvider
async def main():
provider = AnthropicProvider(api_key="âĻ")
async with PostgresCheckpointer("postgresql://user:pass@host/dbname") as cp:
agent = Agent(
provider=provider,
model=Model(id="claude-sonnet-4-5-20250929", provider="anthropic"),
checkpointer=cp,
thread_id="user-42",
)
await agent.prompt("hello")
asyncio.run(main())
The DSN is whatever asyncpg.create_pool(...) accepts. Pool sizing:
async with PostgresCheckpointer(
"postgresql://âĻ",
min_pool_size=2,
max_pool_size=20,
) as cp:
âĻ
Schemaâ
The checkpointer expects three tables: cubepi_threads,
cubepi_messages, and cubepi_schema_version. Unlike SQLite, CubePi
does not create these for you â it verifies on __aenter__ that
they exist with the expected schema_version.
If they're missing, you get CubepiSchemaUninitialized. If the
version doesn't match this cubepi release, you get
CubepiSchemaMismatch.
The reason: a production database belongs to the host application's migration system (Alembic, Atlas, âĻ), not to a third-party library that might fight your existing migrations.
Bootstrapping via Alembicâ
CubePi exposes the SQLAlchemy MetaData so your migrations can adopt
the schema:
# alembic/env.py
from cubepi.checkpointer.postgres import cubepi_metadata, EXPECTED_SCHEMA_VERSION
target_metadata = [my_app_metadata, cubepi_metadata]
Then generate a revision and apply it. The migration must also INSERT the schema version. Use the helper:
# In a migration's upgrade():
from cubepi.checkpointer.postgres.alembic_helpers import (
create_message_partitions_op,
write_schema_version_op,
)
def upgrade():
op.create_table(...) # auto-generated from cubepi_metadata
op.execute(create_message_partitions_op()) # creates the 64 hash partitions
op.execute(write_schema_version_op()) # records EXPECTED_SCHEMA_VERSION
Both helpers return a SQL string â you pass them to op.execute(...).
write_schema_version_op() is idempotent: it deletes any rows from a
prior cubepi version and inserts the current one.
When CubePi later upgrades and bumps EXPECTED_SCHEMA_VERSION, you
generate a new revision and call op.execute(write_schema_version_op())
again.
Data modelâ
cubepi_threads
thread_id (PK)
parent_thread_id -- for forks
forked_at_seq -- seq number at fork point
extra -- JSONB
created_at / updated_at
cubepi_messages
thread_id, seq -- composite PK; partitioned by HASH(thread_id) into 64
role -- "user" | "assistant" | "tool"
metadata -- JSONB (indexed via GIN)
payload -- bytea (msgpack)
created_at
cubepi_schema_version
version (PK)
Important properties:
(thread_id, seq)is the message identity.seqis monotonic per thread, allocated under apg_advisory_xact_lock(hashtext(thread_id)). Two concurrent writers on the same thread serialize cleanly.payloadis msgpack-encodedmodel.model_dump(mode="json"). CubePi reconstructs the Pydantic model on read.metadatais JSONB, queryable. The full message also hasmetadatainside the payload, but the column is the canonical view for SQL queries.- Tables are partitioned by
HASH(thread_id)into 64 partitions. Even distribution across partitions, no per-thread bottleneck.
Concurrencyâ
The advisory lock makes append-on-the-same-thread safe across processes:
# Process A and Process B both append to thread "user-42" at the same time.
# They serialize through pg_advisory_xact_lock and each gets a consecutive seq.
Reads (load) take no lock â they're snapshot-consistent within the
transaction.
The pool default of min=1, max=10 is fine for most apps; bump
max_pool_size if you have many concurrent agents.
save_extra semanticsâ
save_extra does a JSONB merge, not a replace:
extra = cubepi_threads.extra || EXCLUDED.extra
So writing {"foo": 1} then {"bar": 2} leaves {"foo": 1, "bar": 2}. Middleware can safely write partial dicts without losing prior
keys.
Forksâ
The parent_thread_id + forked_at_seq columns exist for future fork
support. CubePi v0.3 doesn't expose a fork API yet â they're written
to so the schema is forward-compatible.
Common pitfallsâ
CubepiSchemaUninitializedâ Your DB is empty or your migrations didn't run. Apply the host alembic upgrade first.CubepiSchemaMismatchâ You upgraded cubepi but didn't generate a new migration. Generate one, apply it, and CubePi will start.- Connection pool exhaustion under load â Default
max_pool_size=10. Bump it if your app's concurrent agent count is higher than that. asyncpg.exceptions.UndefinedTableErroroutside__aenter__â Means you're using the checkpointer outside ofasync with. The pool isn't connected yet. Wrap in the context manager.- Mixing host SQLAlchemy
MetaDataâ CubePi ships its ownMetaDatainstance precisely so it can coexist with your app's models without colliding. Don't merge them into your global metadata â pass both to Alembic separately.
See alsoâ
- SQLite Checkpointing â single-process alternative.
- Custom Backends â Protocol details.
- Recipes â Postgres + FastAPI Service â a deployable HTTP-fronted agent.