cubepi.agent
Agent
class
Agent(self, *, model: BoundModel, system_prompt: str = '', tools: list[AgentTool] | None = None, thinking: ThinkingLevel = 'off', convert_to_llm: Callable[..., list[Message]] | None = None, transform_context: Callable | None = None, transform_system_prompt: Callable | None = None, after_model_response: Callable | None = None, before_tool_call: Callable | None = None, after_tool_call: Callable | None = None, should_stop_after_turn: Callable | None = None, on_run_end: Callable | None = None, on_payload: OnPayloadCallback | None = None, on_response: OnResponseCallback | None = None, steering_mode: str = 'one-at-a-time', follow_up_mode: str = 'one-at-a-time', tool_execution: str = 'parallel', checkpointer: Checkpointer | None = None, thread_id: str | None = None, middleware: list[Middleware] | None = None, channel: HitlChannel | None = None, messages: Sequence[Message] | None = None)
Attributes
convert_to_llmtransform_contexttransform_system_promptafter_model_responsebefore_tool_callafter_tool_callshould_stop_after_turnon_run_endon_payloadon_responsetool_executioncheckpointerthread_idstate:AgentStatechannel:HitlChannel | Nonein_flight_hitl_request:HitlRequest | None
Methods
subscribe
subscribe(listener: Callable) -> Callable[[], None]
steer
steer(message: Message) -> None
cancel_steer
cancel_steer(steer_id: str) -> bool
Remove a not-yet-drained steering message by its steer_id.
Returns True if a queued message was removed; False if it was already drained or never queued (best-effort cancel).
follow_up
follow_up(message: Message) -> None
abort
abort() -> None
wait_for_idle
wait_for_idle() -> None
reset
reset() -> None
prompt
prompt(message: str | Message | list[Message], *, run_id: str | None = None) -> str
fork
fork(src_thread_id: str, new_thread_id: str, *, after_run_id: str, metadata: JsonObject | None = None) -> None
fork_once
fork_once(src_thread_id: str, message: str | Message | list[Message], *, after_run_id: str) -> ForkOnceResult
resume
resume(*, run_id: str | None = None) -> str
detach
detach() -> None
load_pending_hitl_request
load_pending_hitl_request() -> HitlRequest | None
respond
respond(*, question_id: str | None = None, answer: StructuredValue) -> None
abort_pending
abort_pending(reason: str = 'aborted by host') -> None
Abort a pending HITL request and CLOSE the conversation.
Per spec §5.2 "abort closes the conversation" — no new model call. Two-phase: Phase 1 (no lock) interrupts any in-flight HITL await via the agent signal; Phase 2 (with lock) appends synthetic deny tool_results + a terminal stop_reason="aborted" assistant message and emits AgentAbortedEvent.
AgentState
class
AgentState(self, system_prompt: str = '', model: Model = (lambda: Model(id='unknown', provider_id='unknown'))(), thinking: ThinkingLevel = 'off', is_streaming: bool = False, streaming_message: Message | None = None, error_message: str | None = None, active_run_id: str | None = None, last_outcome: RunOutcome | None = None, _tools: list[AgentTool] = list(), _messages: list[Message] = list(), _pending_tool_calls: set[str] = set())
Attributes
system_prompt:strmodel:Modelthinking:ThinkingLevelis_streaming:boolstreaming_message:Message | Noneerror_message:str | Noneactive_run_id:str | Nonelast_outcome:RunOutcome | Nonetools:list[AgentTool]messages:list[Message]pending_tool_calls:set[str]
run_agent_loop
function
run_agent_loop(*, prompts: list[Message], context: AgentContext, provider: Provider, model: Model, convert_to_llm: Callable, emit: Callable, transform_context: Callable | None = None, transform_system_prompt: Callable | None = None, after_model_response: Callable | None = None, before_tool_call: Callable | None = None, after_tool_call: Callable | None = None, should_stop_after_turn: Callable | None = None, get_steering_messages: Callable | None = None, get_follow_up_messages: Callable | None = None, on_run_end: Callable | None = None, stream_options: StreamOptions | None = None, tool_execution: str = 'parallel', system_prompt: str | None = None, set_outcome: Callable[[str], None] | None = None) -> list[Message]
run_agent_loop_continue
function
run_agent_loop_continue(*, context: AgentContext, provider: Provider, model: Model, convert_to_llm: Callable, emit: Callable, transform_context: Callable | None = None, transform_system_prompt: Callable | None = None, after_model_response: Callable | None = None, before_tool_call: Callable | None = None, after_tool_call: Callable | None = None, should_stop_after_turn: Callable | None = None, get_steering_messages: Callable | None = None, get_follow_up_messages: Callable | None = None, on_run_end: Callable | None = None, stream_options: StreamOptions | None = None, tool_execution: str = 'parallel', system_prompt: str | None = None, set_outcome: Callable[[str], None] | None = None) -> list[Message]
tool
function
tool(fn: ToolFunc | None = None, *, name: str | None = None, description: str | None = None, execution_mode: Literal['sequential', 'parallel'] | None = None) -> AgentTool[Any] | Callable[[ToolFunc], AgentTool[Any]]
Turn an async function into an AgentTool.
Usable bare (@tool) or with arguments (@tool(name=..., ...)).
- The tool's input schema is generated from the function's parameters; each
needs a type annotation.
Field(...)defaults/metadata are honoured. - If the function declares
tool_call_id,signal, oron_update, the loop's values are passed through; otherwise they are omitted from the schema and not passed. - The return value may be an
AgentToolResult, astr, aContent, or alist[Content].
execute_tool_calls
function
execute_tool_calls(context: AgentContext, assistant_message: AssistantMessage, *, tool_execution: str = 'parallel', before_tool_call: Callable | None = None, after_tool_call: Callable | None = None, signal: asyncio.Event | None = None, emit: Callable) -> ToolCallBatch
AfterToolCallContext
class
AfterToolCallContext(self, assistant_message: AssistantMessage, tool_call: ToolCall, args: BaseModel | JsonObject, result: AgentToolResult, is_error: bool, context: AgentContext)
Attributes
assistant_message:AssistantMessagetool_call:ToolCallargs:BaseModel | JsonObjectresult:AgentToolResultis_error:boolcontext:AgentContext
AfterToolCallResult
class
Attributes
content:list[Content] | Nonedetails:StructuredValueis_error:bool | Noneterminate:bool | None
AgentContext
class
AgentContext(self, system_prompt: str, messages: list[Message], tools: list[AgentTool] | None = None, extra: JsonObject = dict())
Attributes
system_prompt:strmessages:list[Message]tools:list[AgentTool] | Noneextra:JsonObject
AgentEndEvent
class
Attributes
type:Literal['agent_end']messages:list[Message]
AgentEvent
attribute
AgentEvent = AgentStartEvent | AgentEndEvent | TurnStartEvent | TurnEndEvent | MessageStartEvent | MessageUpdateEvent | MessageEndEvent | ToolExecutionStartEvent | ToolExecutionUpdateEvent | ToolExecutionEndEve...
AgentListener
attribute
AgentListener = Callable[[AgentEvent, asyncio.Event | None], Awaitable[None] | None]
AgentStartEvent
class
Attributes
type:Literal['agent_start']
AgentTool
class
AgentTool(self, name: str, description: str, parameters: type[TParams], execute: Callable[..., Awaitable[AgentToolResult]], label: str = '', execution_mode: Literal['sequential', 'parallel'] | None = None, hitl_builtin: bool = False, hitl: HitlBinding | None = None)
Attributes
name:strdescription:strparameters:type[TParams]execute:Callable[..., Awaitable[AgentToolResult]]label:strexecution_mode:Literal['sequential', 'parallel'] | Nonehitl_builtin:boolhitl:HitlBinding | None
Methods
to_definition
to_definition() -> ToolDefinition
AgentToolResult
class
Attributes
content:list[Content]details:StructuredValueis_error:bool | Noneterminate:bool | None
BeforeToolCallContext
class
BeforeToolCallContext(self, assistant_message: AssistantMessage, tool_call: ToolCall, args: BaseModel | JsonObject, context: AgentContext)
Attributes
assistant_message:AssistantMessagetool_call:ToolCallargs:BaseModel | JsonObjectcontext:AgentContext
BeforeToolCallResult
class
Attributes
block:boolreason:str | Noneedited_args:JsonObject | Nonedeny_reason:str | Nonehitl_trace:StructuredObject | None
MessageEndEvent
class
Attributes
type:Literal['message_end']message:Message
MessageStartEvent
class
Attributes
type:Literal['message_start']message:Message
MessageUpdateEvent
class
Attributes
type:Literal['message_update']message:AssistantMessagestream_event:StreamEvent
ShouldStopAfterTurnContext
class
ShouldStopAfterTurnContext(self, message: AssistantMessage, tool_results: list[ToolResultMessage], context: AgentContext, new_messages: list[Message])
Attributes
message:AssistantMessagetool_results:list[ToolResultMessage]context:AgentContextnew_messages:list[Message]
ToolExecutionEndEvent
class
Attributes
type:Literal['tool_execution_end']tool_call_id:strtool_name:strresult:StructuredValueis_error:boolterminate:bool— True iff the tool's AgentToolResult.terminate was True (or the after_tool_call hook set terminate=True). Recorders use this to mark the turn as terminated-by-tool without unwrappingresult.blocked_by_hook:bool— True iff the tool call was blocked by abefore_tool_callhook returningblock=True. Distinguishes hook-blocks from other immediate errors (tool-not-found, arg-validation failure).block_reason:str | None— Whenblocked_by_hookis True, the reason string fromBeforeToolCallResult.reason(orNoneif the hook supplied no reason).
ToolExecutionStartEvent
class
Attributes
type:Literal['tool_execution_start']tool_call_id:strtool_name:strargs:JsonObject
ToolExecutionUpdateEvent
class
Attributes
type:Literal['tool_execution_update']tool_call_id:strtool_name:strargs:JsonObject | Nonepartial_result:StructuredValue
TurnEndEvent
class
Attributes
type:Literal['turn_end']message:AssistantMessagetool_results:list[ToolResultMessage]
TurnStartEvent
class
Attributes
type:Literal['turn_start']
emit_event
function
emit_event(emit_fn: Callable, event: object) -> None