cubepi.checkpointer
Checkpointer
class
Methods
load
load(thread_id: str) -> CheckpointData | None
append
append(thread_id: str, messages: list[Any]) -> None
save_extra
save_extra(thread_id: str, extra: dict[str, Any]) -> None
CheckpointData
class
CheckpointData(self, messages: list[Any] = list(), extra: dict[str, Any] = dict())
Attributes
messages:list[Any]extra:dict[str, Any]
MemoryCheckpointer
class
MemoryCheckpointer(self)
Methods
load
load(thread_id: str) -> CheckpointData | None
append
append(thread_id: str, messages: list[Any]) -> None
save_extra
save_extra(thread_id: str, extra: dict[str, Any]) -> None
PostgresCheckpointer
class
PostgresCheckpointer(self, dsn: str, *, min_pool_size: int = 1, max_pool_size: int = 10)
Checkpointer backed by PostgreSQL.
Usage
cp = PostgresCheckpointer(dsn="postgresql://...")
async with cp:
await cp.append(thread_id, [msg1, msg2])
data = await cp.load(thread_id)
await cp.save_extra(thread_id, {"k": "v"})
Raises CubepiSchemaUninitialized / CubepiSchemaMismatch at aenter if the DB schema isn't compatible with this cubepi version.
Methods
load
load(thread_id: str) -> CheckpointData | None
append
append(thread_id: str, messages: list[Message]) -> None
save_extra
save_extra(thread_id: str, extra: dict[str, Any]) -> None
SQLiteCheckpointer
class
SQLiteCheckpointer(self, db_path: str)
Methods
load
load(thread_id: str) -> CheckpointData | None
append
append(thread_id: str, messages: list[Any]) -> None
save_extra
save_extra(thread_id: str, extra: dict[str, Any]) -> None