cubepi.middleware
Middleware
class
Attributes
hitl:HitlBinding | None
Methods
transform_context
transform_context(messages: list[Message], *, ctx: AgentContext, signal: asyncio.Event | None = None) -> list[Message]
convert_to_llm
convert_to_llm(messages: list[Message], *, ctx: AgentContext) -> list[Message]
before_tool_call
before_tool_call(ctx: BeforeToolCallContext, *, signal: asyncio.Event | None = None) -> BeforeToolCallResult | None
after_tool_call
after_tool_call(ctx: AfterToolCallContext, *, signal: asyncio.Event | None = None) -> AfterToolCallResult | None
transform_system_prompt
transform_system_prompt(system_prompt: str, *, ctx: AgentContext, signal: asyncio.Event | None = None) -> str
should_stop_after_turn
should_stop_after_turn(ctx: AgentContext) -> bool
after_model_response
after_model_response(response: AssistantMessage, ctx: AgentContext, *, signal: asyncio.Event | None = None) -> TurnAction | None
on_run_end
on_run_end(ctx: AgentContext, *, signal: asyncio.Event | None = None) -> list[Message] | None
extra_llm_calls
extra_llm_calls() -> Iterable[BoundModel]
Declare LLM calls this middleware drives outside the agent's main bound model.
Each entry is a BoundModel — the same handle the user gets from
provider.model(...). cubepi.tracing.Recorder uses these to:
- Subscribe listeners on any provider the recorder isn't already watching, so the resulting calls show up in the trace tree alongside the agent's own chat spans.
- Identify middleware-owned calls by
(spec.provider_id, spec.id)so they don't overwrite the rootinvoke_agentspan's attribution (provider name, system prompt hash, tool list). This model-based gate is what handles the common "reuse one provider client, swap the model" pattern — listener identity alone would attribute the middleware's first call to the agent.
Default is empty — middlewares that do not call any LLM directly need not override.
TurnAction
class
TurnAction(self, response: AssistantMessage | None = None, inject_messages: list[Message] = list(), decision: Literal['natural', 'stop', 'loop_to_model'] = 'natural')
Directs the agent loop's next step after a model response.
Composition (chain): each middleware sees previous middleware's TurnAction. Last middleware's value wins for response and decision. inject_messages concatenates across the chain.
Attributes
response:AssistantMessage | Noneinject_messages:list[Message]decision:Literal['natural', 'stop', 'loop_to_model']
compose_middleware
function
compose_middleware(middlewares: list[Middleware]) -> dict[str, Callable]
CompactionMiddleware
class
CompactionMiddleware(self, *, summary_model: BoundModel, max_tokens_before_compact: int, keep_tail_tokens: int = 8000, max_summary_tokens: int | None = None, min_compact_messages: int = 4, prune_tool_outputs: bool = True, summary_prompt: str | None = None, existing_summary_suffix: str | None = None)
Keep long histories within context by summarizing older turns.
Three layered guards keep the summariser from misbehaving under load:
- Pre-pruning pass (cheap, no LLM call) replaces large old tool results with one-line summaries before the LLM ever sees them.
- Circuit breaker gates only the LLM call; after
_MAX_FAILURESconsecutive errors, switches to the deterministic fallback summariser (still compacts context — never gets stuck). - Anti-thrashing guard skips compaction when prior runs saved
under
_MIN_SAVINGS_PCT; resets when savings recover, the boundary advances by_ANTI_THRASH_NEW_MSGSmessages, or raw history exceedsmax_tokens_before_compact * _ANTI_THRASH_FORCE_RATIO.
Methods
transform_context
transform_context(messages: list[Message], *, ctx: AgentContext, signal: asyncio.Event | None = None) -> list[Message]
extra_llm_calls
extra_llm_calls() -> tuple[BoundModel, ...]
CompactionState
class
JSON-safe summary state stored in AgentContext.extra.
Attributes
summary:strsummarized_message_ids:list[str]summarized_message_refs:list[str]last_summarized_message_id:str | Noneis_fallback:bool
SubagentMiddleware
class
SubagentMiddleware(self, *, subagents: dict[str, SubagentSpec], default_model: BoundModel, shared_tools: Sequence[AgentTool[BaseModel]] = (), inherited_middleware: Sequence[Middleware] = (), excluded_tool_names: set[str] | None = None, event_mapper: EventMapper | None = None, event_handler: EventHandler | None = None, tracer: SubagentTracer | None = None)
Inject a tool that delegates one task to an ephemeral child agent.
Attributes
tools:list[AgentTool[BaseModel]]subagents:dict[str, SubagentSpec]shared_tools:tuple[AgentTool[BaseModel], ...]
SubagentRequest
class
Attributes
name:strrole:strtask:strprompt:strsubagent_type:str
SubagentResult
class
SubagentResult(self, agent_id: str, text: str, events: list[StructuredValue], error: str | None = None)
Attributes
agent_id:strtext:strevents:list[StructuredValue]error:str | None
SubagentSpec
class
SubagentSpec(self, name: str, description: str, system_prompt: str, model: BoundModel | None = None, tools: Sequence[AgentTool[BaseModel]] = tuple(), middleware: Sequence[Middleware] = tuple())
Attributes
name:strdescription:strsystem_prompt:strmodel:BoundModel | Nonetools:Sequence[AgentTool[BaseModel]]middleware:Sequence[Middleware]
Todo
class
A single todo item with content and status.
Attributes
content:strstatus:Literal['pending', 'in_progress', 'completed']
TodoGuardBlocked
class
A guard escalation payload carried across the forced end turn.
Attributes
guard_type:TodoGuardTypemessage:str
Literal
function
Literal(self, *parameters = ())
Special typing form to define literal types (a.k.a. value types).
This form can be used to indicate to type checkers that the corresponding variable or function parameter has a value equivalent to the provided literal (or one of several literals):
def validate_simple(data: Any) -> Literal[True]: # always returns True
...
MODE = Literal['r', 'rb', 'w', 'wb']
def open_helper(file: str, mode: MODE) -> str:
...
open_helper('/some/path', 'r') # Passes type check
open_helper('/other/path', 'typo') # Error in type checker
Literal[...] cannot be subclassed. At runtime, an arbitrary value is allowed as type argument to Literal[...], but type checkers may impose restrictions.
TodoListMiddleware
class
TodoListMiddleware(self, *, extra_ref: Callable[[], dict[str, Any]], system_prompt: str = WRITE_TODOS_SYSTEM_PROMPT, tool_description: str = WRITE_TODOS_TOOL_DESCRIPTION)
Middleware that gives the agent a write_todos tool and enforces finalization.
Hooks:
tools: exposeswrite_todosAgentTool that writes to extra.transform_system_prompt: appends WRITE_TODOS_SYSTEM_PROMPT.transform_context: renders current todo list as a UserMessage appended to context (only when todos are present), so the model always has an up-to-date view of the checklist at each turn.after_tool_call: No-op for all tools exceptwrite_todos; the write_todos tool execute() already writes to extra. This hook exists as a hook attachment point if needed in future.after_model_response: full guard state machine —- blocked guard: if blocked and pure-text → stop, clear state
- suppression: clear guard state once past the blocked episode
- parallel write_todos detection → inject error messages
- payload validation errors → inject error messages
- stale-todo soft reminder → inject UserMessage after threshold
- finalization hard guard → loop_to_model with correction message
- otherwise → return None (natural flow)
Attributes
tools
Methods
transform_system_prompt
transform_system_prompt(system_prompt: str, *, ctx: AgentContext, signal: asyncio.Event | None = None) -> str
Append the write_todos system instructions.
transform_context
transform_context(messages: list[Message], *, ctx: AgentContext, signal: asyncio.Event | None = None) -> list[Message]
Inject current todo state as a UserMessage suffix when todos exist.
We do not rely on persisted ToolResultMessages being visible on replay, so a lightweight reminder is injected at the end of the context to ensure the model always sees the current todo list.
When no todos are set, messages are returned unchanged (no injection).
after_tool_call
after_tool_call(ctx: AfterToolCallContext, *, signal: asyncio.Event | None = None) -> AfterToolCallResult | None
Override duplicate write_todos results when parallel calls are detected.
For the normal (single call) case this is a no-op; write_todos.execute() already wrote the validated list to extra["todos"].
For the parallel case, after_model_response snapshotted the pre-turn todos in extra["_todos_snapshot"]. Each duplicate write_todos call's result is replaced with an error here, and extra["todos"] is restored to the snapshot so the turn leaves the checklist unchanged.
after_model_response
after_model_response(response: AssistantMessage, ctx: AgentContext, *, signal: asyncio.Event | None = None) -> TurnAction | None
Guard state machine.
Inspects the latest assistant message, transitions the blocked /
stale-iteration state stored in extra, and returns a
TurnAction when the loop needs an injected nudge or hard stop.
WriteTodosInput
class
Input schema for the write_todos tool.
Attributes
todos:list[Todo]