Core Concepts
Six concepts cover everything CubePi does. Read this page once, then the rest of the docs become a lookup table.
Agentโ
Agent is the stateful faรงade: you construct it with a provider, a
model, optional tools, and optional middleware/checkpointer. You drive
it through three methods:
await agent.prompt(message)โ start a new turn from a user message.await agent.resume()โ continue from the last persisted message (used with a checkpointer).agent.steer(message)/agent.follow_up(message)โ queue a message mid-flight or after the current run.
The agent owns an AgentState (system prompt, tools, model, message
history, pending tool calls, streaming flag) and a list of subscribers:
unsubscribe = agent.subscribe(my_listener)
# ...
unsubscribe()
Subscribers receive every AgentEvent the loop emits. They can be
sync or async.
Toolโ
An AgentTool is a name + description + Pydantic parameter model +
async execute function:
from pydantic import BaseModel
from cubepi import AgentTool, AgentToolResult, TextContent
class SearchParams(BaseModel):
query: str
limit: int = 10
async def execute(tool_call_id, params: SearchParams, *, signal=None, on_update=None):
# do work; respect `signal` if you can be aborted
return AgentToolResult(content=[TextContent(text=f"โฆ")])
search = AgentTool(
name="search",
description="Search the corpus",
parameters=SearchParams,
execute=execute,
)
The Pydantic schema is auto-converted to JSON Schema and passed to the
model. Arg parsing, error wrapping, and parallel execution are
handled by the framework. See Tool Use
for execution_mode, on_update (incremental progress), and
terminate (end the turn from a tool).
Providerโ
A Provider is anything matching this Protocol:
class Provider(Protocol):
async def stream(
self,
model: Model,
messages: list[Message],
*,
system_prompt: str = "",
tools: list[ToolDefinition] | None = None,
options: StreamOptions | None = None,
) -> MessageStream: ...
It returns a MessageStream โ a single async iterator that yields
StreamEvents and exposes the final AssistantMessage via await stream.result(). Built-in providers:
AnthropicProviderโ Claude (Messages API, with thinking, caching, tool use).OpenAIProviderโ GPT family (Chat Completions API).OpenAIResponsesProviderโ GPT family (Responses API, server-side state).FauxProviderโ deterministic test double (no network).
Write your own by implementing one method. See Providers / Custom.
Stream and eventsโ
Streams and events are two layers:
- Provider streams โ
MessageStreamyields provider events:start,text_start,text_delta,text_end,thinking_*,toolcall_*,done,error. This is the raw token stream. - Agent events โ what
agent.subscribe(...)receives. 14 types covering the entire loop + HITL:agent_start,agent_end,turn_start,turn_end,message_start,message_update,message_end,tool_execution_start,tool_execution_update,tool_execution_end,hitl_request,hitl_answer,agent_suspended,agent_aborted.
Subscribe to agent events for UI; for low-level token routing dig into
event.stream_event. See Streaming Events.
Middlewareโ
Middleware is a class with up to seven typed hooks:
| Hook | When it runs | Composition rule |
|---|---|---|
transform_context | Before each model call, on the message list | Chained โ each receives previous result |
convert_to_llm | Right before serialisation to the provider | Last implementation wins |
transform_system_prompt | Before each model call, on the system prompt | Chained |
before_tool_call | Per tool call, after arg validation | First block=True short-circuits |
after_tool_call | Per tool call, after execute | Later override earlier |
after_model_response | After the assistant message lands | Returns a TurnAction controlling flow |
should_stop_after_turn | At each turn boundary | Any True stops |
Pass middleware as a list to Agent(middleware=[...]). See
Middleware โ Composition.
Checkpointerโ
A Checkpointer is anything matching:
class Checkpointer(Protocol):
async def load(self, thread_id: str) -> CheckpointData | None: ...
async def append(self, thread_id: str, messages: list[Message]) -> None: ...
async def save_extra(self, thread_id: str, extra: dict) -> None: ...
Bind one to an agent with Agent(checkpointer=cp, thread_id="โฆ") and
the loop will append each new message as it lands, restoring history
on the first prompt(). Built-in backends: MemoryCheckpointer,
SQLiteCheckpointer, PostgresCheckpointer, MySQLCheckpointer.
HITL adds two optional methods for cross-process suspend/resume:
save_pending_request / load_pending_request. All first-party
backends implement them. See HITL guide.
HITL (Human-in-the-Loop)โ
CubePi ships a built-in cubepi.hitl module for scenarios where the
agent needs to pause and wait for a human:
- Sandbox confirmation โ a dangerous tool (bash, file write) needs approve / deny / edit before running.
- Mid-run questions โ the agent surfaces a structured form to the user and waits for the answer.
from cubepi.hitl import InMemoryChannel, ConfirmToolCallMiddleware, ask_user_tool
channel = InMemoryChannel()
agent = Agent(
provider=โฆ, model=โฆ,
tools=[bash_tool, ask_user_tool(channel)],
middleware=[ConfirmToolCallMiddleware(channel, require_confirm={"bash"})],
channel=channel,
)
The channel is an await-able coroutine collaborator: tool and
middleware authors write await channel.ask(...) or
await channel.confirm(...) and the channel handles the suspend. Host
code (your web app / TUI) subscribes to channel.subscribe() or polls
channel.pending, renders the request to the user, and posts the
answer via channel.answer(qid, answer).
Two channel backends ship in-box:
InMemoryChannelโ single process (CLI, notebook, tests).CheckpointedChannelโ cross-process (web service). The pending request is persisted to the checkpointer; a different process can answer hours later viaAgent.respond(question_id=, answer=).
Full details โ the three HITL verbs, both built-in middlewares, the cross-process suspend/resume protocol, events, trace spans, and error reference โ are in the HITL guide.
Tracer (optional)โ
Tracer produces OpenTelemetry spans aligned with the
GenAI Semantic Conventions,
so any OTLP backend (Jaeger, Tempo, Honeycomb, Datadog, AWS X-Ray, โฆ)
can ingest agent runs without custom instrumentation. Install the
extra:
pip install "cubepi[tracing]" # OTel SDK
pip install "cubepi[tracing-otlp]" # + OTLP/HTTP exporter
then wrap your agent in an async with:
from cubepi.tracing import Tracer
from cubepi.tracing.exporters import JsonlSpanExporter
async with (
Tracer(
service_name="my-bot",
agent_name="assistant",
exporters=[JsonlSpanExporter(directory="./cubepi-traces")],
) as tracer,
tracer.attached(agent),
):
await agent.prompt("โฆ")
Each run emits an invoke_agent root span containing one
cubepi.turn per LLM round-trip, plus chat (CLIENT) and
execute_tool children. By default no prompt content or model
output is recorded โ opt in with Tracer(record_content=True) and
a redact callback for PII. Pair with Meter(...) for token /
duration / TTFC histograms. Full guide:
Tracing โ Overview.
Putting it togetherโ
User code
โ
โผ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Agent โ
โ โโ AgentState (messages, tools, โฆ) โ
โ โโ Middleware โโ compose_middleware() โ
โ โโ Checkpointer โโ append on message_end โ
โ โโ run_agent_loop โโโโโ the actual loop โ
โ โ โ
โ โผ โ
โ Provider.stream() โ MessageStream โ
โ โ โ
โ โโ events โ emit โ subscribers โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
That diagram is the whole framework. The rest of this site is just the details.