跳到主要内容
版本:Next 🚧

cubepi.checkpointer

Checkpointer

class

source

Methods

load

load(thread_id: str) -> CheckpointData | None

source

append

append(thread_id: str, messages: list[Any]) -> None

source

save_extra

save_extra(thread_id: str, extra: dict[str, Any]) -> None

source

save_pending_request

save_pending_request(thread_id: str, request: Any) -> None

Persist (or clear, if request is None) the pending HITL request for a thread.

First-party implementations (Memory, SQLite, Postgres, MySQL) all implement this. HITL-requiring features (Agent.respond, CheckpointedChannel) use getattr(checkpointer, "save_pending_request", None) for graceful degradation.

source

load_pending_request

load_pending_request(thread_id: str) -> Any

Load the persisted pending HITL request for a thread, or None.

Returns a HitlRequest instance or None.

source

CheckpointData

class

CheckpointData(self, messages: list[Any] = list(), extra: dict[str, Any] = dict())

source

Attributes

  • messages: list[Any]
  • extra: dict[str, Any]

MemoryCheckpointer

class

MemoryCheckpointer(self)

source

Methods

load

load(thread_id: str) -> CheckpointData | None

source

append

append(thread_id: str, messages: list[Any]) -> None

source

save_extra

save_extra(thread_id: str, extra: dict[str, Any]) -> None

source

save_pending_request

save_pending_request(thread_id: str, request: Any) -> None

source

load_pending_request

load_pending_request(thread_id: str) -> Any

source

PostgresCheckpointer

class

PostgresCheckpointer(self, dsn: str, *, min_pool_size: int = 1, max_pool_size: int = 10)

Checkpointer backed by PostgreSQL.

Usage

cp = PostgresCheckpointer(dsn="postgresql://...")
async with cp:
await cp.append(thread_id, [msg1, msg2])
data = await cp.load(thread_id)
await cp.save_extra(thread_id, {"k": "v"})

Raises CubepiSchemaUninitialized / CubepiSchemaMismatch at aenter if the DB schema isn't compatible with this cubepi version.

source

Methods

load

load(thread_id: str) -> CheckpointData | None

source

append

append(thread_id: str, messages: list[Message]) -> None

source

save_extra

save_extra(thread_id: str, extra: dict[str, Any]) -> None

source

save_pending_request

save_pending_request(thread_id: str, request) -> None

source

load_pending_request

load_pending_request(thread_id: str)

source

SQLiteCheckpointer

class

SQLiteCheckpointer(self, db_path: str)

source

Methods

load

load(thread_id: str) -> CheckpointData | None

source

append

append(thread_id: str, messages: list[Any]) -> None

source

save_extra

save_extra(thread_id: str, extra: dict[str, Any]) -> None

source

save_pending_request

save_pending_request(thread_id: str, request: Any) -> None

source

load_pending_request

load_pending_request(thread_id: str) -> Any

source

MySQLCheckpointer

class

MySQLCheckpointer(self, dsn: str, *, min_pool_size: int = 1, max_pool_size: int = 10)

Checkpointer backed by MySQL (8.0.13+, InnoDB).

Usage

cp = MySQLCheckpointer("mysql://user:pw@host:3306/db")
async with cp:
await cp.append(thread_id, [msg1, msg2])
data = await cp.load(thread_id)
await cp.save_extra(thread_id, {"k": "v"})

Raises CubepiSchemaUninitialized / CubepiSchemaMismatch at aenter if the DB schema isn't compatible with this cubepi version.

source

Methods

load

load(thread_id: str) -> CheckpointData | None

source

append

append(thread_id: str, messages: list[Message]) -> None

source

save_extra

save_extra(thread_id: str, extra: dict[str, Any]) -> None

source

save_pending_request

save_pending_request(thread_id, request)

source

load_pending_request

load_pending_request(thread_id)

source