跳到主要内容
版本:0.8(最新)

cubepi.agent

Agent

class

Agent(self, *, model: BoundModel, system_prompt: str = '', tools: list[AgentTool] | None = None, thinking: ThinkingLevel = 'off', convert_to_llm: Callable[..., list[Message]] | None = None, transform_context: Callable | None = None, transform_system_prompt: Callable | None = None, after_model_response: Callable | None = None, before_tool_call: Callable | None = None, after_tool_call: Callable | None = None, should_stop_after_turn: Callable | None = None, on_run_end: Callable | None = None, on_payload: OnPayloadCallback | None = None, on_response: OnResponseCallback | None = None, steering_mode: str = 'one-at-a-time', follow_up_mode: str = 'one-at-a-time', tool_execution: str = 'parallel', checkpointer: Checkpointer | None = None, thread_id: str | None = None, middleware: list[Middleware] | None = None, channel: HitlChannel | None = None, messages: Sequence[Message] | None = None)

source

Attributes

  • convert_to_llm
  • transform_context
  • transform_system_prompt
  • after_model_response
  • before_tool_call
  • after_tool_call
  • should_stop_after_turn
  • on_run_end
  • on_payload
  • on_response
  • tool_execution
  • checkpointer
  • thread_id
  • state: AgentState
  • channel: HitlChannel | None
  • in_flight_hitl_request: HitlRequest | None

Methods

subscribe

subscribe(listener: Callable) -> Callable[[], None]

source

steer

steer(message: Message) -> None

source

cancel_steer

cancel_steer(steer_id: str) -> bool

Remove a not-yet-drained steering message by its steer_id.

Returns True if a queued message was removed; False if it was already drained or never queued (best-effort cancel).

source

follow_up

follow_up(message: Message) -> None

source

abort

abort() -> None

source

wait_for_idle

wait_for_idle() -> None

source

reset

reset() -> None

source

prompt

prompt(message: str | Message | list[Message], *, run_id: str | None = None) -> str

source

fork

fork(src_thread_id: str, new_thread_id: str, *, after_run_id: str, metadata: JsonObject | None = None) -> None

source

fork_once

fork_once(src_thread_id: str, message: str | Message | list[Message], *, after_run_id: str) -> ForkOnceResult

source

resume

resume(*, run_id: str | None = None) -> str

source

detach

detach() -> None

source

load_pending_hitl_request

load_pending_hitl_request() -> HitlRequest | None

source

respond

respond(*, question_id: str | None = None, answer: StructuredValue) -> None

source

abort_pending

abort_pending(reason: str = 'aborted by host') -> None

Abort a pending HITL request and CLOSE the conversation.

Per spec §5.2 "abort closes the conversation" — no new model call. Two-phase: Phase 1 (no lock) interrupts any in-flight HITL await via the agent signal; Phase 2 (with lock) appends synthetic deny tool_results + a terminal stop_reason="aborted" assistant message and emits AgentAbortedEvent.

source

AgentState

class

AgentState(self, system_prompt: str = '', model: Model = (lambda: Model(id='unknown', provider_id='unknown'))(), thinking: ThinkingLevel = 'off', is_streaming: bool = False, streaming_message: Message | None = None, error_message: str | None = None, active_run_id: str | None = None, last_outcome: RunOutcome | None = None, _tools: list[AgentTool] = list(), _messages: list[Message] = list(), _pending_tool_calls: set[str] = set())

source

Attributes

  • system_prompt: str
  • model: Model
  • thinking: ThinkingLevel
  • is_streaming: bool
  • streaming_message: Message | None
  • error_message: str | None
  • active_run_id: str | None
  • last_outcome: RunOutcome | None
  • tools: list[AgentTool]
  • messages: list[Message]
  • pending_tool_calls: set[str]

run_agent_loop

function

run_agent_loop(*, prompts: list[Message], context: AgentContext, provider: Provider, model: Model, convert_to_llm: Callable, emit: Callable, transform_context: Callable | None = None, transform_system_prompt: Callable | None = None, after_model_response: Callable | None = None, before_tool_call: Callable | None = None, after_tool_call: Callable | None = None, should_stop_after_turn: Callable | None = None, get_steering_messages: Callable | None = None, get_follow_up_messages: Callable | None = None, on_run_end: Callable | None = None, stream_options: StreamOptions | None = None, tool_execution: str = 'parallel', system_prompt: str | None = None, set_outcome: Callable[[str], None] | None = None) -> list[Message]

source

run_agent_loop_continue

function

run_agent_loop_continue(*, context: AgentContext, provider: Provider, model: Model, convert_to_llm: Callable, emit: Callable, transform_context: Callable | None = None, transform_system_prompt: Callable | None = None, after_model_response: Callable | None = None, before_tool_call: Callable | None = None, after_tool_call: Callable | None = None, should_stop_after_turn: Callable | None = None, get_steering_messages: Callable | None = None, get_follow_up_messages: Callable | None = None, on_run_end: Callable | None = None, stream_options: StreamOptions | None = None, tool_execution: str = 'parallel', system_prompt: str | None = None, set_outcome: Callable[[str], None] | None = None) -> list[Message]

source

tool

function

tool(fn: ToolFunc | None = None, *, name: str | None = None, description: str | None = None, execution_mode: Literal['sequential', 'parallel'] | None = None) -> AgentTool[Any] | Callable[[ToolFunc], AgentTool[Any]]

Turn an async function into an AgentTool.

Usable bare (@tool) or with arguments (@tool(name=..., ...)).

  • The tool's input schema is generated from the function's parameters; each needs a type annotation. Field(...) defaults/metadata are honoured.
  • If the function declares tool_call_id, signal, or on_update, the loop's values are passed through; otherwise they are omitted from the schema and not passed.
  • The return value may be an AgentToolResult, a str, a Content, or a list[Content].

source

execute_tool_calls

function

execute_tool_calls(context: AgentContext, assistant_message: AssistantMessage, *, tool_execution: str = 'parallel', before_tool_call: Callable | None = None, after_tool_call: Callable | None = None, signal: asyncio.Event | None = None, emit: Callable) -> ToolCallBatch

source

AfterToolCallContext

class

AfterToolCallContext(self, assistant_message: AssistantMessage, tool_call: ToolCall, args: BaseModel | JsonObject, result: AgentToolResult, is_error: bool, context: AgentContext)

source

Attributes

  • assistant_message: AssistantMessage
  • tool_call: ToolCall
  • args: BaseModel | JsonObject
  • result: AgentToolResult
  • is_error: bool
  • context: AgentContext

AfterToolCallResult

class

source

Attributes

  • content: list[Content] | None
  • details: StructuredValue
  • is_error: bool | None
  • terminate: bool | None

AgentContext

class

AgentContext(self, system_prompt: str, messages: list[Message], tools: list[AgentTool] | None = None, extra: JsonObject = dict())

source

Attributes

  • system_prompt: str
  • messages: list[Message]
  • tools: list[AgentTool] | None
  • extra: JsonObject

AgentEndEvent

class

source

Attributes

  • type: Literal['agent_end']
  • messages: list[Message]

AgentEvent

attribute

AgentEvent = AgentStartEvent | AgentEndEvent | TurnStartEvent | TurnEndEvent | MessageStartEvent | MessageUpdateEvent | MessageEndEvent | ToolExecutionStartEvent | ToolExecutionUpdateEvent | ToolExecutionEndEve...

source

AgentListener

attribute

AgentListener = Callable[[AgentEvent, asyncio.Event | None], Awaitable[None] | None]

source

AgentStartEvent

class

source

Attributes

  • type: Literal['agent_start']

AgentTool

class

AgentTool(self, name: str, description: str, parameters: type[TParams], execute: Callable[..., Awaitable[AgentToolResult]], label: str = '', execution_mode: Literal['sequential', 'parallel'] | None = None, hitl_builtin: bool = False, hitl: HitlBinding | None = None)

source

Attributes

  • name: str
  • description: str
  • parameters: type[TParams]
  • execute: Callable[..., Awaitable[AgentToolResult]]
  • label: str
  • execution_mode: Literal['sequential', 'parallel'] | None
  • hitl_builtin: bool
  • hitl: HitlBinding | None

Methods

to_definition

to_definition() -> ToolDefinition

source

AgentToolResult

class

source

Attributes

  • content: list[Content]
  • details: StructuredValue
  • is_error: bool | None
  • terminate: bool | None

BeforeToolCallContext

class

BeforeToolCallContext(self, assistant_message: AssistantMessage, tool_call: ToolCall, args: BaseModel | JsonObject, context: AgentContext)

source

Attributes

  • assistant_message: AssistantMessage
  • tool_call: ToolCall
  • args: BaseModel | JsonObject
  • context: AgentContext

BeforeToolCallResult

class

source

Attributes

  • block: bool
  • reason: str | None
  • edited_args: JsonObject | None
  • deny_reason: str | None
  • hitl_trace: StructuredObject | None

MessageEndEvent

class

source

Attributes

  • type: Literal['message_end']
  • message: Message

MessageStartEvent

class

source

Attributes

  • type: Literal['message_start']
  • message: Message

MessageUpdateEvent

class

source

Attributes

  • type: Literal['message_update']
  • message: AssistantMessage
  • stream_event: StreamEvent

ShouldStopAfterTurnContext

class

ShouldStopAfterTurnContext(self, message: AssistantMessage, tool_results: list[ToolResultMessage], context: AgentContext, new_messages: list[Message])

source

Attributes

  • message: AssistantMessage
  • tool_results: list[ToolResultMessage]
  • context: AgentContext
  • new_messages: list[Message]

ToolExecutionEndEvent

class

source

Attributes

  • type: Literal['tool_execution_end']
  • tool_call_id: str
  • tool_name: str
  • result: StructuredValue
  • is_error: bool
  • terminate: bool — True iff the tool's AgentToolResult.terminate was True (or the after_tool_call hook set terminate=True). Recorders use this to mark the turn as terminated-by-tool without unwrapping result.
  • blocked_by_hook: bool — True iff the tool call was blocked by a before_tool_call hook returning block=True. Distinguishes hook-blocks from other immediate errors (tool-not-found, arg-validation failure).
  • block_reason: str | None — When blocked_by_hook is True, the reason string from BeforeToolCallResult.reason (or None if the hook supplied no reason).

ToolExecutionStartEvent

class

source

Attributes

  • type: Literal['tool_execution_start']
  • tool_call_id: str
  • tool_name: str
  • args: JsonObject

ToolExecutionUpdateEvent

class

source

Attributes

  • type: Literal['tool_execution_update']
  • tool_call_id: str
  • tool_name: str
  • args: JsonObject | None
  • partial_result: StructuredValue

TurnEndEvent

class

source

Attributes

  • type: Literal['turn_end']
  • message: AssistantMessage
  • tool_results: list[ToolResultMessage]

TurnStartEvent

class

source

Attributes

  • type: Literal['turn_start']

emit_event

function

emit_event(emit_fn: Callable, event: object) -> None

source