跳到主要内容
版本:0.10

cubepi.providers

AssistantMessage

class

source

Attributes

  • role: Literal['assistant']
  • content: list[Content | ThinkingContent | ToolCall]
  • stop_reason: str
  • error_message: str | None
  • usage: Usage | None
  • timestamp: float | None
  • provider_id: str
  • model_id: str
  • response_id: str | None
  • metadata: JsonObject
  • run_id: str | None

BaseProvider

class

BaseProvider(self, *, provider_id: str = '')

Concrete base class for built-in cubepi providers.

Built-in providers (Anthropic, OpenAI, OpenAI Responses, Faux) inherit from this class to gain the persistent listener registry used by cubepi.tracing and other observers. User-defined providers should inherit from BaseProvider unless they implement the full Provider protocol themselves.

Concrete subclasses must implement stream() and call _fire_listeners at three points: after the request payload is finalized, for each StreamEvent pushed onto the stream, and exactly once in a finally block after the stream terminates.

Per-call mutators (StreamOptions.on_payload, StreamOptions.on_response) retain their existing single-slot semantics and fire independently of the persistent listener registry below.

source

Attributes

  • provider_id

Methods

model

model(id: str, *, api: str = '', reasoning: bool = False, context_window: int = 200000, max_tokens: int = 8192, temperature: float = 0.7, cost: ModelCost | None = None, thinking_level_map: dict[str, str | None] | None = None) -> BoundModel

source

stream

stream(model: Model, messages: list[Message], *, system_prompt: str = '', tools: list[ToolDefinition] | None = None, tool_choice: ToolChoice | None = None, options: StreamOptions | None = None) -> MessageStream

source

generate

generate(model: Model, messages: list[Message], *, system_prompt: str = '', tools: list[ToolDefinition] | None = None, tool_choice: ToolChoice | None = None, options: StreamOptions | None = None, max_output_tokens: int | None = None, temperature: float | None = None, thinking: ThinkingLevel | None = None, thinking_budgets: ThinkingBudgets | None = None) -> AssistantMessage

Run a single provider call and return the final assistant message.

source

subscribe_request

subscribe_request(cb: OnRequestCallback) -> Callable[[], None]

Register a persistent observer for request payloads.

Returns a detach callable that removes this specific subscription.

source

subscribe_chunk

subscribe_chunk(cb: OnChunkCallback) -> Callable[[], None]

Register a persistent observer for stream chunks.

Returns a detach callable.

source

subscribe_response

subscribe_response(cb: OnResponseBodyCallback) -> Callable[[], None]

Register a persistent observer for assembled responses.

Returns a detach callable.

source

BoundModel

class

BoundModel(self, provider: Provider, spec: Model)

source

Attributes

  • provider: Provider
  • spec: Model

Methods

stream

stream(messages: list[Message], *, system_prompt: str = '', tools: list[ToolDefinition] | None = None, tool_choice: ToolChoice | None = None, options: StreamOptions | None = None) -> MessageStream

source

generate

generate(messages: list[Message], *, system_prompt: str = '', tools: list[ToolDefinition] | None = None, tool_choice: ToolChoice | None = None, options: StreamOptions | None = None, max_output_tokens: int | None = None, temperature: float | None = None, thinking: ThinkingLevel | None = None, thinking_budgets: ThinkingBudgets | None = None) -> AssistantMessage

source

generate_structured

generate_structured(output_type: type[BaseModelT], messages: list[Message], *, system_prompt: str = '', tool_name: str = 'structured_output', tool_description: str = 'Return the structured output', max_output_tokens: int | None = None, temperature: float | None = None, max_retries: int = 1) -> BaseModelT

source

Content

attribute

Content = TextContent | ImageContent

source

ImageContent

class

source

Attributes

  • type: Literal['image']
  • source: str
  • media_type: str

Message

attribute

Message = UserMessage | AssistantMessage | ToolResultMessage

source

MessageStream

class

MessageStream(self)

source

Methods

attach_task

attach_task(task: asyncio.Task) -> None

source

push

push(event: StreamEvent) -> None

source

set_result

set_result(message: AssistantMessage) -> None

source

result

result() -> AssistantMessage

Return the final assistant message.

Blocks not just on the result future, but also on the producer task's completion — the producer's finally block runs _fire_response_listeners (after set_result), so without waiting for the task, callers under asyncio.run teardown could exit before async response listeners have run. Producer exceptions are NOT re-raised here; they were already surfaced via the result future or the stream's error events.

source

Model

class

source

Attributes

  • id: str
  • provider_id: str
  • api: str
  • reasoning: bool
  • context_window: int
  • max_tokens: int
  • temperature: float
  • cost: ModelCost | None
  • thinking_level_map: dict[str, str | None] | None

ModelCost

class

source

Attributes

  • input: float
  • output: float
  • cache_read: float
  • cache_write: float

OnChunkCallback

attribute

OnChunkCallback = Callable[['StreamEvent', Model], Awaitable[None] | None]

Persistent observer. Fires for every StreamEvent pushed onto the stream (start, text_delta, thinking_delta, toolcall_delta, done, error, ...). Heavy listeners should early-return on irrelevant event types — this hook fires hot. Return value is ignored.

source

OnPayloadCallback

attribute

OnPayloadCallback = Callable[[dict, Model], Awaitable[dict | None] | dict | None]

Optional callback for inspecting/replacing provider payloads before sending. Return a dict to replace the payload, or None to keep unchanged.

source

OnRequestCallback

attribute

OnRequestCallback = Callable[[dict, Model], Awaitable[None] | None]

Persistent observer. Fires just before HTTP send, after any per-call StreamOptions.on_payload mutation has been applied. Receives the final wire payload dict and the Model. Return value is ignored.

source

OnResponseBodyCallback

attribute

OnResponseBodyCallback = Callable[[dict | None, Model, BaseException | None], Awaitable[None] | None]

Persistent observer. Fires exactly once per stream() call, in a finally block, after the stream terminates.

  • body: assembled provider response as a dict (same shape a non-streaming call to the provider would have returned), or None if the stream failed before a response could be assembled.
  • exc: the exception that ended the stream (including asyncio.CancelledError), or None on normal completion. Return value is ignored.

source

OnResponseCallback

attribute

OnResponseCallback = Callable[['ProviderResponse', Model], Awaitable[None] | None]

Optional callback invoked after an HTTP response is received.

source

Provider

class

source

Methods

stream

stream(model: Model, messages: list[Message], *, system_prompt: str = '', tools: list[ToolDefinition] | None = None, tool_choice: ToolChoice | None = None, options: StreamOptions | None = None) -> MessageStream

source

generate

generate(model: Model, messages: list[Message], *, system_prompt: str = '', tools: list[ToolDefinition] | None = None, tool_choice: ToolChoice | None = None, options: StreamOptions | None = None, max_output_tokens: int | None = None, temperature: float | None = None, thinking: ThinkingLevel | None = None, thinking_budgets: ThinkingBudgets | None = None) -> AssistantMessage

source

ProviderResponse

class

ProviderResponse(self, status: int, headers: dict[str, str] = dict())

HTTP response metadata exposed to on_response callbacks.

source

Attributes

  • status: int
  • headers: dict[str, str]

StreamEvent

class

source

Attributes

  • type: Literal['start', 'text_start', 'text_delta', 'text_end', 'thinking_start', 'thinking_delta', 'thinking_end', 'toolcall_start', 'toolcall_delta', 'toolcall_end', 'done', 'error']
  • content_index: int | None
  • delta: str | None
  • partial: AssistantMessage | None
  • error_message: str | None

StreamOptions

class

Options bag for Provider.stream(), transparent to the agent loop.

source

Attributes

  • model_config
  • thinking: ThinkingLevel
  • thinking_budgets: ThinkingBudgets | None
  • signal: asyncio.Event | None
  • on_payload: OnPayloadCallback | None
  • on_response: OnResponseCallback | None

TextContent

class

source

Attributes

  • type: Literal['text']
  • text: str

ThinkingBudgets

class

Token budgets for each thinking level.

source

Attributes

  • low: int
  • medium: int
  • high: int

ThinkingContent

class

source

Attributes

  • type: Literal['thinking']
  • thinking: str

ThinkingLevel

attribute

ThinkingLevel = Literal['off', 'low', 'medium', 'high', 'xhigh']

source

ToolCall

class

source

Attributes

  • type: Literal['tool_call']
  • id: str
  • name: str
  • arguments: JsonObject

ToolDefinition

class

source

Attributes

  • name: str
  • description: str
  • parameters: JsonObject

ToolResultMessage

class

source

Attributes

  • role: Literal['tool_result']
  • tool_call_id: str
  • tool_name: str
  • content: list[Content]
  • details: StructuredValue
  • is_error: bool
  • timestamp: float | None
  • metadata: JsonObject
  • run_id: str | None

Usage

class

source

Attributes

  • input_tokens: int
  • output_tokens: int
  • cache_read_tokens: int
  • cache_write_tokens: int

UserMessage

class

source

Attributes

  • role: Literal['user']
  • content: list[Content]
  • timestamp: float | None
  • metadata: JsonObject
  • run_id: str | None

is_synthetic_message

function

is_synthetic_message(message: Message) -> bool

True if message was injected by the framework rather than a human.

source

synthetic_user_message

function

synthetic_user_message(text: str, *, source: str) -> UserMessage

Build a user-role message injected by the framework, not typed by a human.

All injected user-role messages MUST be created through this factory so downstream consumers can branch on a single marker (metadata["synthetic"] is True). source (e.g. "todo_guard", "goal_continuation") is for tracing/debugging only — UI behavior should never depend on it.

source

adjust_max_tokens_for_thinking

function

adjust_max_tokens_for_thinking(base_max_tokens: int, model_max_tokens: int, reasoning_level: ThinkingLevel, custom_budgets: ThinkingBudgets | None = None) -> tuple[int, int]

Adjust max_tokens to reserve space for a thinking budget.

Given a base max_tokens (the desired output capacity), increases it to accommodate the thinking budget while respecting the model's hard cap. If the model cap is too small to fit both, the thinking budget is reduced to leave at least min_output_tokens (1024) for output.

Returns

  • A (max_tokens, thinking_budget) tuple.

source

chain_providers

function

chain_providers(model: object) -> list['BaseProvider']

Return the unique BaseProvider instances backing a bound model.

Walks a model's chain (FallbackBoundModel-like) or its single .provider (plain BoundModel-like). Uses duck typing on .chain to avoid importing cubepi.providers.fallback into this base module — fallback already depends on base, so the reverse import would cycle.

For FallbackBoundModel-like inputs, iterates the chain and dedupes providers by identity. Chain entries whose .provider isn't a BaseProvider are logged at WARNING via stdlib logging and skipped — tracing / metrics need the subscribe_request / subscribe_chunk / subscribe_response interface that only BaseProvider exposes. (cubepi does not depend on loguru; hosts that use it can intercept stdlib logging.)

For plain bound models, returns [model.provider] if it is a BaseProvider. For None or any other input, returns [].

Used by cubepi.tracing.recorder.Recorder.attach and cubepi.tracing.meter.Meter.attach to subscribe to every leg of a fallback chain so post-failover provider events land in the trace / metric stream.

source

collect_agent_providers

function

collect_agent_providers(agent: Any) -> list['BaseProvider']

Return the unique BaseProvider instances backing an agent.

Walks agent._model via chain_providers and falls back to a public provider attribute on the agent for legacy code paths. Used by both cubepi.tracing.recorder.Recorder.attach and cubepi.tracing.meter.Meter.attach to dedupe the prelude that finds providers to subscribe to.

source

FauxProvider

class

FauxProvider(self, *, tokens_per_second: float | None = None, token_size_min: int = 3, token_size_max: int = 5, provider_id: str = '')

source

Attributes

  • call_count
  • pending_response_count: int
  • prompt_cache: dict[str, str] — Read-only access to the prompt cache for test assertions.

Methods

set_responses

set_responses(responses: list[FauxResponseStep]) -> None

source

append_responses

append_responses(responses: list[FauxResponseStep]) -> None

source

clear_prompt_cache

clear_prompt_cache() -> None

Clear the prompt cache, useful between test scenarios.

source

stream

stream(model: Model, messages: list[Message], *, system_prompt: str = '', tools: list[ToolDefinition] | None = None, tool_choice: ToolChoice | None = None, options: StreamOptions | None = None) -> MessageStream

source

faux_assistant_message

function

faux_assistant_message(content: str | FauxContentBlock | list[FauxContentBlock], *, stop_reason: str = 'stop', error_message: str | None = None) -> AssistantMessage

source

faux_text

function

faux_text(text: str) -> TextContent

source

faux_thinking

function

faux_thinking(thinking: str) -> ThinkingContent

source

faux_tool_call

function

faux_tool_call(name: str, arguments: dict[str, Any], *, id: str | None = None) -> ToolCall

source

DEFAULT_TRIGGER_ERRORS

attribute

DEFAULT_TRIGGER_ERRORS: frozenset[type[ProviderError]] = frozenset({RateLimited, ProviderUnavailable, ContextLengthExceeded})

source

FallbackBoundModel

class

FallbackBoundModel(self, chain: tuple[BoundModel, ...], trigger_errors: frozenset[type[ProviderError]] = DEFAULT_TRIGGER_ERRORS, on_failover: Callable[[BoundModel, BoundModel | None, BaseException | str], Awaitable[None] | None] | None = None)

Ordered chain of BoundModels — tries each in turn on retriable errors.

chain[0] is the primary model. On a trigger_errors exception or a first-event error from stream(), the next model in the chain is tried transparently. For generate(), an error AssistantMessage (stop_reason="error") also triggers failover. Mid-stream errors (after the first non-error event) are forwarded as-is.

provider and spec proxy chain[0] so tracing/billing code that reads agent._model.provider / agent._model.spec continues to work unchanged.

Tracer / Meter coverage: Recorder.attach() and Meter.attach() detect FallbackBoundModel and subscribe to every unique BaseProvider in the chain (via chain_providers() below), so post-failover chat spans and provider metrics land in the trace / metric stream like primary-leg calls do.

source

Attributes

  • chain: tuple[BoundModel, ...]
  • trigger_errors: frozenset[type[ProviderError]]
  • on_failover: Callable[[BoundModel, BoundModel | None, BaseException | str], Awaitable[None] | None] | None
  • provider: Provider
  • spec: Model

Methods

stream

stream(messages: list[Message], *, system_prompt: str = '', tools: list[ToolDefinition] | None = None, tool_choice: ToolChoice | None = None, options: StreamOptions | None = None) -> MessageStream

source

generate

generate(messages: list[Message], *, system_prompt: str = '', tools: list[ToolDefinition] | None = None, tool_choice: ToolChoice | None = None, options: StreamOptions | None = None, max_output_tokens: int | None = None, temperature: float | None = None, thinking: ThinkingLevel | None = None, thinking_budgets: ThinkingBudgets | None = None) -> AssistantMessage

source

THINKING_LEVELS

attribute

THINKING_LEVELS: list[ThinkingLevel] = ['off', 'low', 'medium', 'high', 'xhigh']

source

clamp_thinking_level

function

clamp_thinking_level(model: Model, level: ThinkingLevel) -> ThinkingLevel

Clamp level to the nearest supported level for model.

If level is already supported, return it unchanged. Otherwise search upward first (higher intensity), then downward, through the ordered level list to find the closest available level.

source

get_supported_thinking_levels

function

get_supported_thinking_levels(model: Model) -> list[ThinkingLevel]

Return the thinking levels supported by model.

  • Non-reasoning models only support ["off"].
  • For reasoning models, levels are filtered through the model's thinking_level_map. A level mapped to None is unsupported. "xhigh" is only included when it has an explicit (non-None) mapping. All other levels are included by default when the map omits them.

source

models_are_equal

function

models_are_equal(a: Model | None, b: Model | None) -> bool

Return True if a and b refer to the same model.

Comparison is by id and provider_id. Returns False when either argument is None.

source