Skip to main content
Version: 0.9

cubepi.middleware

Middleware

class

source

Attributes

  • hitl: HitlBinding | None

Methods

transform_context

transform_context(messages: list[Message], *, ctx: AgentContext, signal: asyncio.Event | None = None) -> list[Message]

source

convert_to_llm

convert_to_llm(messages: list[Message], *, ctx: AgentContext) -> list[Message]

source

before_tool_call

before_tool_call(ctx: BeforeToolCallContext, *, signal: asyncio.Event | None = None) -> BeforeToolCallResult | None

source

after_tool_call

after_tool_call(ctx: AfterToolCallContext, *, signal: asyncio.Event | None = None) -> AfterToolCallResult | None

source

transform_system_prompt

transform_system_prompt(system_prompt: str, *, ctx: AgentContext, signal: asyncio.Event | None = None) -> str

source

should_stop_after_turn

should_stop_after_turn(ctx: AgentContext) -> bool

source

after_model_response

after_model_response(response: AssistantMessage, ctx: AgentContext, *, signal: asyncio.Event | None = None) -> TurnAction | None

source

on_run_end

on_run_end(ctx: AgentContext, *, signal: asyncio.Event | None = None) -> list[Message] | None

source

extra_llm_calls

extra_llm_calls() -> Iterable[BoundModel]

Declare LLM calls this middleware drives outside the agent's main bound model.

Each entry is a BoundModel — the same handle the user gets from provider.model(...). cubepi.tracing.Recorder uses these to:

  • Subscribe listeners on any provider the recorder isn't already watching, so the resulting calls show up in the trace tree alongside the agent's own chat spans.
  • Identify middleware-owned calls by (spec.provider_id, spec.id) so they don't overwrite the root invoke_agent span's attribution (provider name, system prompt hash, tool list). This model-based gate is what handles the common "reuse one provider client, swap the model" pattern — listener identity alone would attribute the middleware's first call to the agent.

Default is empty — middlewares that do not call any LLM directly need not override.

source

TurnAction

class

TurnAction(self, response: AssistantMessage | None = None, inject_messages: list[Message] = list(), decision: Literal['natural', 'stop', 'loop_to_model'] = 'natural')

Directs the agent loop's next step after a model response.

Composition (chain): each middleware sees previous middleware's TurnAction. Last middleware's value wins for response and decision. inject_messages concatenates across the chain.

source

Attributes

  • response: AssistantMessage | None
  • inject_messages: list[Message]
  • decision: Literal['natural', 'stop', 'loop_to_model']

compose_middleware

function

compose_middleware(middlewares: list[Middleware]) -> dict[str, Callable]

source

CompactionMiddleware

class

CompactionMiddleware(self, *, summary_model: BoundModel, max_tokens_before_compact: int, keep_tail_tokens: int = 8000, max_summary_tokens: int | None = None, min_compact_messages: int = 4, prune_tool_outputs: bool = True, summary_prompt: str | None = None, existing_summary_suffix: str | None = None)

Keep long histories within context by summarizing older turns.

Three layered guards keep the summariser from misbehaving under load:

  • Pre-pruning pass (cheap, no LLM call) replaces large old tool results with one-line summaries before the LLM ever sees them.
  • Circuit breaker gates only the LLM call; after _MAX_FAILURES consecutive errors, switches to the deterministic fallback summariser (still compacts context — never gets stuck).
  • Anti-thrashing guard skips compaction when prior runs saved under _MIN_SAVINGS_PCT; resets when savings recover, the boundary advances by _ANTI_THRASH_NEW_MSGS messages, or raw history exceeds max_tokens_before_compact * _ANTI_THRASH_FORCE_RATIO.

source

Methods

transform_context

transform_context(messages: list[Message], *, ctx: AgentContext, signal: asyncio.Event | None = None) -> list[Message]

source

extra_llm_calls

extra_llm_calls() -> tuple[BoundModel, ...]

source

CompactionState

class

JSON-safe summary state stored in AgentContext.extra.

source

Attributes

  • summary: str
  • summarized_message_ids: list[str]
  • summarized_message_refs: list[str]
  • last_summarized_message_id: str | None
  • is_fallback: bool

SubagentMiddleware

class

SubagentMiddleware(self, *, subagents: dict[str, SubagentSpec], default_model: BoundModel, shared_tools: Sequence[AgentTool[BaseModel]] = (), inherited_middleware: Sequence[Middleware] = (), excluded_tool_names: set[str] | None = None, event_mapper: EventMapper | None = None, event_handler: EventHandler | None = None, tracer: SubagentTracer | None = None)

Inject a tool that delegates one task to an ephemeral child agent.

source

Attributes

  • tools: list[AgentTool[BaseModel]]
  • subagents: dict[str, SubagentSpec]
  • shared_tools: tuple[AgentTool[BaseModel], ...]

SubagentRequest

class

source

Attributes

  • name: str
  • role: str
  • task: str
  • prompt: str
  • subagent_type: str

SubagentResult

class

SubagentResult(self, agent_id: str, text: str, events: list[StructuredValue], error: str | None = None)

source

Attributes

  • agent_id: str
  • text: str
  • events: list[StructuredValue]
  • error: str | None

SubagentSpec

class

SubagentSpec(self, name: str, description: str, system_prompt: str, model: BoundModel | None = None, tools: Sequence[AgentTool[BaseModel]] = tuple(), middleware: Sequence[Middleware] = tuple())

source

Attributes

  • name: str
  • description: str
  • system_prompt: str
  • model: BoundModel | None
  • tools: Sequence[AgentTool[BaseModel]]
  • middleware: Sequence[Middleware]

Todo

class

A single todo item with content and status.

source

Attributes

  • content: str
  • status: Literal['pending', 'in_progress', 'completed']

TodoGuardBlocked

class

A guard escalation payload carried across the forced end turn.

source

Attributes

  • guard_type: TodoGuardType
  • message: str

Literal

function

Literal(self, *parameters = ())

Special typing form to define literal types (a.k.a. value types).

This form can be used to indicate to type checkers that the corresponding variable or function parameter has a value equivalent to the provided literal (or one of several literals):

def validate_simple(data: Any) -> Literal[True]: # always returns True
...

MODE = Literal['r', 'rb', 'w', 'wb']
def open_helper(file: str, mode: MODE) -> str:
...

open_helper('/some/path', 'r') # Passes type check
open_helper('/other/path', 'typo') # Error in type checker

Literal[...] cannot be subclassed. At runtime, an arbitrary value is allowed as type argument to Literal[...], but type checkers may impose restrictions.

source

TodoListMiddleware

class

TodoListMiddleware(self, *, extra_ref: Callable[[], dict[str, Any]], system_prompt: str = WRITE_TODOS_SYSTEM_PROMPT, tool_description: str = WRITE_TODOS_TOOL_DESCRIPTION)

Middleware that gives the agent a write_todos tool and enforces finalization.

Hooks:

  • tools: exposes write_todos AgentTool that writes to extra.
  • transform_system_prompt: appends WRITE_TODOS_SYSTEM_PROMPT.
  • transform_context: renders current todo list as a UserMessage appended to context (only when todos are present), so the model always has an up-to-date view of the checklist at each turn.
  • after_tool_call: No-op for all tools except write_todos; the write_todos tool execute() already writes to extra. This hook exists as a hook attachment point if needed in future.
  • after_model_response: full guard state machine —
    • blocked guard: if blocked and pure-text → stop, clear state
    • suppression: clear guard state once past the blocked episode
    • parallel write_todos detection → inject error messages
    • payload validation errors → inject error messages
    • stale-todo soft reminder → inject UserMessage after threshold
    • finalization hard guard → loop_to_model with correction message
    • otherwise → return None (natural flow)

source

Attributes

  • tools

Methods

transform_system_prompt

transform_system_prompt(system_prompt: str, *, ctx: AgentContext, signal: asyncio.Event | None = None) -> str

Append the write_todos system instructions.

source

transform_context

transform_context(messages: list[Message], *, ctx: AgentContext, signal: asyncio.Event | None = None) -> list[Message]

Inject current todo state as a UserMessage suffix when todos exist.

We do not rely on persisted ToolResultMessages being visible on replay, so a lightweight reminder is injected at the end of the context to ensure the model always sees the current todo list.

When no todos are set, messages are returned unchanged (no injection).

source

after_tool_call

after_tool_call(ctx: AfterToolCallContext, *, signal: asyncio.Event | None = None) -> AfterToolCallResult | None

Override duplicate write_todos results when parallel calls are detected.

For the normal (single call) case this is a no-op; write_todos.execute() already wrote the validated list to extra["todos"].

For the parallel case, after_model_response snapshotted the pre-turn todos in extra["_todos_snapshot"]. Each duplicate write_todos call's result is replaced with an error here, and extra["todos"] is restored to the snapshot so the turn leaves the checklist unchanged.

source

after_model_response

after_model_response(response: AssistantMessage, ctx: AgentContext, *, signal: asyncio.Event | None = None) -> TurnAction | None

Guard state machine.

Inspects the latest assistant message, transitions the blocked / stale-iteration state stored in extra, and returns a TurnAction when the loop needs an injected nudge or hard stop.

source

WriteTodosInput

class

Input schema for the write_todos tool.

source

Attributes

  • todos: list[Todo]