Skip to main content
Version: Next ๐Ÿšง

Core Concepts

Six concepts cover everything CubePi does. Read this page once, then the rest of the docs become a lookup table.

Agentโ€‹

Agent is the stateful faรงade: you construct it with a provider, a model, optional tools, and optional middleware/checkpointer. You drive it through three methods:

  • await agent.prompt(message) โ€” start a new turn from a user message.
  • await agent.resume() โ€” continue from the last persisted message (used with a checkpointer).
  • agent.steer(message) / agent.follow_up(message) โ€” queue a message mid-flight or after the current run.

The agent owns an AgentState (system prompt, tools, model, message history, pending tool calls, streaming flag) and a list of subscribers:

unsubscribe = agent.subscribe(my_listener)
# ...
unsubscribe()

Subscribers receive every AgentEvent the loop emits. They can be sync or async.

Toolโ€‹

An AgentTool is a name + description + Pydantic parameter model + async execute function:

from pydantic import BaseModel
from cubepi import AgentTool, AgentToolResult, TextContent

class SearchParams(BaseModel):
query: str
limit: int = 10

async def execute(tool_call_id, params: SearchParams, *, signal=None, on_update=None):
# do work; respect `signal` if you can be aborted
return AgentToolResult(content=[TextContent(text=f"โ€ฆ")])

search = AgentTool(
name="search",
description="Search the corpus",
parameters=SearchParams,
execute=execute,
)

The Pydantic schema is auto-converted to JSON Schema and passed to the model. Arg parsing, error wrapping, and parallel execution are handled by the framework. See Tool Use for execution_mode, on_update (incremental progress), and terminate (end the turn from a tool).

Providerโ€‹

A Provider is anything matching this Protocol:

class Provider(Protocol):
async def stream(
self,
model: Model,
messages: list[Message],
*,
system_prompt: str = "",
tools: list[ToolDefinition] | None = None,
options: StreamOptions | None = None,
) -> MessageStream: ...

It returns a MessageStream โ€” a single async iterator that yields StreamEvents and exposes the final AssistantMessage via await stream.result(). Built-in providers:

  • AnthropicProvider โ€” Claude (Messages API, with thinking, caching, tool use).
  • OpenAIProvider โ€” GPT family (Chat Completions API).
  • OpenAIResponsesProvider โ€” GPT family (Responses API, server-side state).
  • FauxProvider โ€” deterministic test double (no network).

Write your own by implementing one method. See Providers / Custom.

Stream and eventsโ€‹

Streams and events are two layers:

  • Provider streams โ€” MessageStream yields provider events: start, text_start, text_delta, text_end, thinking_*, toolcall_*, done, error. This is the raw token stream.
  • Agent events โ€” what agent.subscribe(...) receives. 14 types covering the entire loop + HITL: agent_start, agent_end, turn_start, turn_end, message_start, message_update, message_end, tool_execution_start, tool_execution_update, tool_execution_end, hitl_request, hitl_answer, agent_suspended, agent_aborted.

Subscribe to agent events for UI; for low-level token routing dig into event.stream_event. See Streaming Events.

Middlewareโ€‹

Middleware is a class with up to seven typed hooks:

HookWhen it runsComposition rule
transform_contextBefore each model call, on the message listChained โ€” each receives previous result
convert_to_llmRight before serialisation to the providerLast implementation wins
transform_system_promptBefore each model call, on the system promptChained
before_tool_callPer tool call, after arg validationFirst block=True short-circuits
after_tool_callPer tool call, after executeLater override earlier
after_model_responseAfter the assistant message landsReturns a TurnAction controlling flow
should_stop_after_turnAt each turn boundaryAny True stops

Pass middleware as a list to Agent(middleware=[...]). See Middleware โ†’ Composition.

Checkpointerโ€‹

A Checkpointer is anything matching:

class Checkpointer(Protocol):
async def load(self, thread_id: str) -> CheckpointData | None: ...
async def append(self, thread_id: str, messages: list[Message]) -> None: ...
async def save_extra(self, thread_id: str, extra: dict) -> None: ...

Bind one to an agent with Agent(checkpointer=cp, thread_id="โ€ฆ") and the loop will append each new message as it lands, restoring history on the first prompt(). Built-in backends: MemoryCheckpointer, SQLiteCheckpointer, PostgresCheckpointer, MySQLCheckpointer.

HITL adds two optional methods for cross-process suspend/resume: save_pending_request / load_pending_request. All first-party backends implement them. See HITL guide.

HITL (Human-in-the-Loop)โ€‹

CubePi ships a built-in cubepi.hitl module for scenarios where the agent needs to pause and wait for a human:

  • Sandbox confirmation โ€” a dangerous tool (bash, file write) needs approve / deny / edit before running.
  • Mid-run questions โ€” the agent surfaces a structured form to the user and waits for the answer.
from cubepi.hitl import InMemoryChannel, ConfirmToolCallMiddleware, ask_user_tool

channel = InMemoryChannel()

agent = Agent(
provider=โ€ฆ, model=โ€ฆ,
tools=[bash_tool, ask_user_tool(channel)],
middleware=[ConfirmToolCallMiddleware(channel, require_confirm={"bash"})],
channel=channel,
)

The channel is an await-able coroutine collaborator: tool and middleware authors write await channel.ask(...) or await channel.confirm(...) and the channel handles the suspend. Host code (your web app / TUI) subscribes to channel.subscribe() or polls channel.pending, renders the request to the user, and posts the answer via channel.answer(qid, answer).

Two channel backends ship in-box:

  • InMemoryChannel โ€” single process (CLI, notebook, tests).
  • CheckpointedChannel โ€” cross-process (web service). The pending request is persisted to the checkpointer; a different process can answer hours later via Agent.respond(question_id=, answer=).

Full details โ€” the three HITL verbs, both built-in middlewares, the cross-process suspend/resume protocol, events, trace spans, and error reference โ€” are in the HITL guide.

Tracer (optional)โ€‹

Tracer produces OpenTelemetry spans aligned with the GenAI Semantic Conventions, so any OTLP backend (Jaeger, Tempo, Honeycomb, Datadog, AWS X-Ray, โ€ฆ) can ingest agent runs without custom instrumentation. Install the extra:

pip install "cubepi[tracing]" # OTel SDK
pip install "cubepi[tracing-otlp]" # + OTLP/HTTP exporter

then wrap your agent in an async with:

from cubepi.tracing import Tracer
from cubepi.tracing.exporters import JsonlSpanExporter

async with (
Tracer(
service_name="my-bot",
agent_name="assistant",
exporters=[JsonlSpanExporter(directory="./cubepi-traces")],
) as tracer,
tracer.attached(agent),
):
await agent.prompt("โ€ฆ")

Each run emits an invoke_agent root span containing one cubepi.turn per LLM round-trip, plus chat (CLIENT) and execute_tool children. By default no prompt content or model output is recorded โ€” opt in with Tracer(record_content=True) and a redact callback for PII. Pair with Meter(...) for token / duration / TTFC histograms. Full guide: Tracing โ†’ Overview.

Putting it togetherโ€‹

User code
โ”‚
โ–ผ
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ Agent โ”‚
โ”‚ โ”œโ”€ AgentState (messages, tools, โ€ฆ) โ”‚
โ”‚ โ”œโ”€ Middleware โ”€โ”€ compose_middleware() โ”‚
โ”‚ โ”œโ”€ Checkpointer โ”€โ”€ append on message_end โ”‚
โ”‚ โ””โ”€ run_agent_loop โ—€โ”€โ”€โ”€โ”€ the actual loop โ”‚
โ”‚ โ”‚ โ”‚
โ”‚ โ–ผ โ”‚
โ”‚ Provider.stream() โ†’ MessageStream โ”‚
โ”‚ โ”‚ โ”‚
โ”‚ โ””โ”€ events โ†’ emit โ†’ subscribers โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

That diagram is the whole framework. The rest of this site is just the details.