Skip to main content
Version: 0.8

cubepi.checkpointer

Checkpointer

class

source

Methods

load

load(thread_id: str) -> CheckpointData | None

source

append

append(thread_id: str, messages: list[Message]) -> None

source

save_extra

save_extra(thread_id: str, extra: JsonObject) -> None

source

save_pending_request

save_pending_request(thread_id: str, request: HitlRequest | None, *, run_id: str | None = None) -> None

Persist (or clear, if request is None) the pending HITL request for a thread.

First-party implementations (Memory, SQLite, Postgres, MySQL) all implement this. HITL-requiring features (Agent.respond, CheckpointedChannel) use getattr(checkpointer, "save_pending_request", None) for graceful degradation.

source

load_pending_request

load_pending_request(thread_id: str) -> HitlRequest | None

Load the persisted pending HITL request for a thread, or None.

Returns a HitlRequest instance or None.

source

snapshot

snapshot(thread_id: str, *, after_run_id: str) -> list[Message]

Return messages of completed runs of thread_id up through and including after_run_id, in source seq order. Raises ThreadNotFoundError or RunNotCompletedError.

source

fork

fork(src_thread_id: str, new_thread_id: str, *, after_run_id: str, metadata: JsonObject | None = None) -> None

Atomically physical-copy messages of completed runs up through after_run_id from src to new. See spec §3.2 / §3.4.

source

claim_run

claim_run(thread_id: str, run_id: str) -> None

Insert cubepi_runs row with claimed_at=now, completed_at=NULL. Lazily creates the threads row if needed. Raises RunAlreadyClaimedError or RunAlreadyCompletedError on PK conflict (distinguished by completed_at IS NULL/NOT NULL).

source

mark_run_complete

mark_run_complete(thread_id: str, run_id: str) -> None

Allocate next per-thread completion_seq; UPDATE the run row. Idempotent on already-completed rows (does NOT raise RunAlreadyCompletedError). Raises RunNotClaimedError when no row exists.

source

load_pending

load_pending(thread_id: str) -> tuple[HitlRequest, str | None] | None

Read (HitlRequest, run_id) atomically from the pending row, or None when no pending request exists.

source

CheckpointData

class

CheckpointData(self, messages: list[Message] = list(), extra: JsonObject = dict(), parent_thread_id: str | None = None)

source

Attributes

  • messages: list[Message]
  • extra: JsonObject
  • parent_thread_id: str | None

MemoryCheckpointer

class

MemoryCheckpointer(self)

source

Methods

load

load(thread_id: str) -> CheckpointData | None

source

append

append(thread_id: str, messages: list[Message]) -> None

source

save_extra

save_extra(thread_id: str, extra: JsonObject) -> None

source

save_pending_request

save_pending_request(thread_id: str, request: HitlRequest | None, *, run_id: str | None = None) -> None

source

load_pending_request

load_pending_request(thread_id: str) -> HitlRequest | None

source

load_pending_run_id

load_pending_run_id(thread_id: str) -> str | None

source

load_pending

load_pending(thread_id: str) -> tuple[HitlRequest, str | None] | None

source

claim_run

claim_run(thread_id: str, run_id: str) -> None

source

mark_run_complete

mark_run_complete(thread_id: str, run_id: str) -> None

source

snapshot

snapshot(thread_id: str, *, after_run_id: str) -> list[Message]

source

fork

fork(src_thread_id: str, new_thread_id: str, *, after_run_id: str, metadata: JsonObject | None = None) -> None

source

PostgresCheckpointer

class

PostgresCheckpointer(self, dsn: str, *, min_pool_size: int = 1, max_pool_size: int = 10)

Checkpointer backed by PostgreSQL.

Usage

cp = PostgresCheckpointer(dsn="postgresql://...")
async with cp:
await cp.append(thread_id, [msg1, msg2])
data = await cp.load(thread_id)
await cp.save_extra(thread_id, {"k": "v"})

Raises CubepiSchemaUninitialized / CubepiSchemaMismatch at aenter if the DB schema isn't compatible with this cubepi version.

source

Methods

load

load(thread_id: str) -> CheckpointData | None

source

append

append(thread_id: str, messages: list[Message]) -> None

source

claim_run

claim_run(thread_id: str, run_id: str) -> None

source

mark_run_complete

mark_run_complete(thread_id: str, run_id: str) -> None

source

load_pending

load_pending(thread_id: str) -> tuple[HitlRequest, str | None] | None

source

snapshot

snapshot(thread_id: str, *, after_run_id: str) -> list[Message]

source

fork

fork(src_thread_id: str, new_thread_id: str, *, after_run_id: str, metadata: JsonObject | None = None) -> None

source

save_extra

save_extra(thread_id: str, extra: JsonObject) -> None

source

save_pending_request

save_pending_request(thread_id: str, request: HitlRequest | None, *, run_id: str | None = None) -> None

Persist a pending HITL request and its owning run_id.

When request is None, both pending_request and run_id are cleared; the run_id kwarg is ignored in that case (the pending row's run_id is always cleared alongside the pending).

pending and run_id are set in ONE UPDATE; the surrounding transaction also covers the lazy thread INSERT.

source

load_pending_request

load_pending_request(thread_id: str) -> HitlRequest | None

source

load_pending_run_id

load_pending_run_id(thread_id: str) -> str | None

Return the run_id of the currently pending HITL request.

Filters on pending_request IS NOT NULL so the result reflects a real pending, not a leftover run_id from a cleared row. Returns None when: the thread is unknown, has no pending request, or was written by a pre-v3 host (legacy rows have run_id NULL).

source

SQLiteCheckpointer

class

SQLiteCheckpointer(self, db_path: str)

source

Methods

load

load(thread_id: str) -> CheckpointData | None

source

append

append(thread_id: str, messages: list[Message]) -> None

source

save_extra

save_extra(thread_id: str, extra: JsonObject) -> None

source

save_pending_request

save_pending_request(thread_id: str, request: HitlRequest | None, *, run_id: str | None = None) -> None

source

claim_run

claim_run(thread_id: str, run_id: str) -> None

source

mark_run_complete

mark_run_complete(thread_id: str, run_id: str) -> None

source

load_pending

load_pending(thread_id: str) -> tuple[HitlRequest, str | None] | None

source

load_pending_request

load_pending_request(thread_id: str) -> HitlRequest | None

source

load_pending_run_id

load_pending_run_id(thread_id: str) -> str | None

source

snapshot

snapshot(thread_id: str, *, after_run_id: str) -> list[Message]

source

fork

fork(src_thread_id: str, new_thread_id: str, *, after_run_id: str, metadata: JsonObject | None = None) -> None

source

MySQLCheckpointer

class

MySQLCheckpointer(self, dsn: str, *, min_pool_size: int = 1, max_pool_size: int = 10)

Checkpointer backed by MySQL (8.0.13+, InnoDB).

Usage

cp = MySQLCheckpointer("mysql://user:pw@host:3306/db")
async with cp:
await cp.append(thread_id, [msg1, msg2])
data = await cp.load(thread_id)
await cp.save_extra(thread_id, {"k": "v"})

Raises CubepiSchemaUninitialized / CubepiSchemaMismatch at aenter if the DB schema isn't compatible with this cubepi version.

source

Methods

load

load(thread_id: str) -> CheckpointData | None

source

append

append(thread_id: str, messages: list[Message]) -> None

source

claim_run

claim_run(thread_id: str, run_id: str) -> None

Atomically claim a run_id on a thread.

Lazy-creates the cubepi_threads row, takes a per-thread FOR UPDATE lock to serialize concurrent claims for the same thread, then checks cubepi_runs for an existing row before inserting. The pre-check matches the Postgres approach — using INSERT + catching IntegrityError would also work, but a pre-SELECT lets us distinguish in-flight vs completed cleanly without relying on the error-class taxonomy.

source

mark_run_complete

mark_run_complete(thread_id: str, run_id: str) -> None

Mark (thread_id, run_id) complete with a monotonic completion_seq.

Idempotent: a second call on an already-completed row is a no-op. Raises RunNotClaimedError if no claim row exists. The completion_seq is the per-thread MAX(completion_seq) + 1, allocated under the same per-thread FOR UPDATE fence as claim_run/append.

source

load_pending

load_pending(thread_id: str) -> tuple[HitlRequest, str | None] | None

Return the thread's pending HITL request + the owning run_id.

Single SELECT covers both columns (vs. load_pending_request + load_pending_run_id, which incur two round trips and can race across a clear/set window). Returns None when no pending exists.

source

snapshot

snapshot(thread_id: str, *, after_run_id: str) -> list[Message]

Return the message slice that fork() would copy, without writing.

Includes legacy messages with NULL run_id plus messages whose run_id belongs to a completed run at or before after_run_id's completion cutoff. Raises ThreadNotFoundError if the source thread row is absent, or RunNotCompletedError if after_run_id has not yet been completion-marked.

source

fork

fork(src_thread_id: str, new_thread_id: str, *, after_run_id: str, metadata: JsonObject | None = None) -> None

Copy completed prefix of src_thread_id into new_thread_id.

Threads-row first: the destination cubepi_threads row is inserted before the message + runs copies so callers see a complete view if they probe mid-fork. There is no cubepi_runs -> cubepi_threads FK on MySQL (partition limitation), but we keep the same ordering for semantic parity with Postgres.

Per-thread serialization: a SELECT … FOR UPDATE on the source thread row fences concurrent forks/appends — without it, an append racing the fork could leak a partial in-flight run into the destination.

source

save_extra

save_extra(thread_id: str, extra: JsonObject) -> None

source

save_pending_request

save_pending_request(thread_id: str, request: HitlRequest | None, *, run_id: str | None = None) -> None

Persist a pending HITL request and its owning run_id.

When request is None, both pending_request and run_id are cleared; the run_id kwarg is ignored in that case (the pending row's run_id is always cleared alongside the pending).

pending and run_id are set in ONE UPDATE; the surrounding explicit transaction also covers the lazy thread INSERT.

source

load_pending_request

load_pending_request(thread_id: str) -> HitlRequest | None

source

load_pending_run_id

load_pending_run_id(thread_id: str) -> str | None

Return the run_id of the currently pending HITL request.

Filters on pending_request IS NOT NULL so the result reflects a real pending, not a leftover run_id from a cleared row. Returns None when: the thread is unknown, has no pending request, or was written by a pre-v3 host (legacy rows have run_id NULL).

source