cubepi.checkpointer
Checkpointer
class
Methods
load
load(thread_id: str) -> CheckpointData | None
append
append(thread_id: str, messages: list[Message]) -> None
save_extra
save_extra(thread_id: str, extra: JsonObject) -> None
save_pending_request
save_pending_request(thread_id: str, request: HitlRequest | None, *, run_id: str | None = None) -> None
Persist (or clear, if request is None) the pending HITL request for a thread.
First-party implementations (Memory, SQLite, Postgres, MySQL) all implement this.
HITL-requiring features (Agent.respond, CheckpointedChannel) use
getattr(checkpointer, "save_pending_request", None) for graceful degradation.
load_pending_request
load_pending_request(thread_id: str) -> HitlRequest | None
Load the persisted pending HITL request for a thread, or None.
Returns a HitlRequest instance or None.
CheckpointData
class
CheckpointData(self, messages: list[Message] = list(), extra: JsonObject = dict())
Attributes
messages:list[Message]extra:JsonObject
MemoryCheckpointer
class
MemoryCheckpointer(self)
Methods
load
load(thread_id: str) -> CheckpointData | None
append
append(thread_id: str, messages: list[Message]) -> None
save_extra
save_extra(thread_id: str, extra: JsonObject) -> None
save_pending_request
save_pending_request(thread_id: str, request: HitlRequest | None, *, run_id: str | None = None) -> None
load_pending_request
load_pending_request(thread_id: str) -> HitlRequest | None
load_pending_run_id
load_pending_run_id(thread_id: str) -> str | None
PostgresCheckpointer
class
PostgresCheckpointer(self, dsn: str, *, min_pool_size: int = 1, max_pool_size: int = 10)
Checkpointer backed by PostgreSQL.
Usage
cp = PostgresCheckpointer(dsn="postgresql://...")
async with cp:
await cp.append(thread_id, [msg1, msg2])
data = await cp.load(thread_id)
await cp.save_extra(thread_id, {"k": "v"})
Raises CubepiSchemaUninitialized / CubepiSchemaMismatch at aenter if the DB schema isn't compatible with this cubepi version.
Methods
load
load(thread_id: str) -> CheckpointData | None
append
append(thread_id: str, messages: list[Message]) -> None
save_extra
save_extra(thread_id: str, extra: JsonObject) -> None
save_pending_request
save_pending_request(thread_id: str, request: HitlRequest | None, *, run_id: str | None = None) -> None
Persist a pending HITL request and its owning run_id.
When request is None, both pending_request and run_id
are cleared; the run_id kwarg is ignored in that case (the
pending row's run_id is always cleared alongside the pending).
pending and run_id are set in ONE UPDATE; the surrounding transaction also covers the lazy thread INSERT.
load_pending_request
load_pending_request(thread_id: str) -> HitlRequest | None
load_pending_run_id
load_pending_run_id(thread_id: str) -> str | None
Return the run_id of the currently pending HITL request.
Filters on pending_request IS NOT NULL so the result reflects
a real pending, not a leftover run_id from a cleared row. Returns
None when: the thread is unknown, has no pending request, or was
written by a pre-v3 host (legacy rows have run_id NULL).
SQLiteCheckpointer
class
SQLiteCheckpointer(self, db_path: str)
Methods
load
load(thread_id: str) -> CheckpointData | None
append
append(thread_id: str, messages: list[Message]) -> None
save_extra
save_extra(thread_id: str, extra: JsonObject) -> None
save_pending_request
save_pending_request(thread_id: str, request: HitlRequest | None, *, run_id: str | None = None) -> None
load_pending_request
load_pending_request(thread_id: str) -> HitlRequest | None
load_pending_run_id
load_pending_run_id(thread_id: str) -> str | None
MySQLCheckpointer
class
MySQLCheckpointer(self, dsn: str, *, min_pool_size: int = 1, max_pool_size: int = 10)
Checkpointer backed by MySQL (8.0.13+, InnoDB).
Usage
cp = MySQLCheckpointer("mysql://user:pw@host:3306/db")
async with cp:
await cp.append(thread_id, [msg1, msg2])
data = await cp.load(thread_id)
await cp.save_extra(thread_id, {"k": "v"})
Raises CubepiSchemaUninitialized / CubepiSchemaMismatch at aenter if the DB schema isn't compatible with this cubepi version.
Methods
load
load(thread_id: str) -> CheckpointData | None
append
append(thread_id: str, messages: list[Message]) -> None
save_extra
save_extra(thread_id: str, extra: JsonObject) -> None
save_pending_request
save_pending_request(thread_id: str, request: HitlRequest | None, *, run_id: str | None = None) -> None
Persist a pending HITL request and its owning run_id.
When request is None, both pending_request and run_id
are cleared; the run_id kwarg is ignored in that case (the
pending row's run_id is always cleared alongside the pending).
pending and run_id are set in ONE UPDATE; the surrounding explicit transaction also covers the lazy thread INSERT.
load_pending_request
load_pending_request(thread_id: str) -> HitlRequest | None
load_pending_run_id
load_pending_run_id(thread_id: str) -> str | None
Return the run_id of the currently pending HITL request.
Filters on pending_request IS NOT NULL so the result reflects
a real pending, not a leftover run_id from a cleared row. Returns
None when: the thread is unknown, has no pending request, or was
written by a pre-v3 host (legacy rows have run_id NULL).