cubepi.providers
AssistantMessage
class
Attributes
role:Literal['assistant']content:list[Content | ThinkingContent | ToolCall]stop_reason:strerror_message:str | Noneusage:Usage | Nonetimestamp:float | Noneprovider_id:strmodel_id:strresponse_id:str | Nonemetadata:JsonObjectrun_id:str | None
BaseProvider
class
BaseProvider(self, *, provider_id: str = '')
Concrete base class for built-in cubepi providers.
Built-in providers (Anthropic, OpenAI, OpenAI Responses, Faux) inherit
from this class to gain the persistent listener registry used by
cubepi.tracing and other observers. User-defined providers should
inherit from BaseProvider unless they implement the full Provider
protocol themselves.
Concrete subclasses must implement stream() and call
_fire_listeners at three points: after the request payload is
finalized, for each StreamEvent pushed onto the stream, and exactly
once in a finally block after the stream terminates.
Per-call mutators (StreamOptions.on_payload,
StreamOptions.on_response) retain their existing single-slot
semantics and fire independently of the persistent listener registry
below.
Attributes
provider_id
Methods
model
model(id: str, *, api: str = '', reasoning: bool = False, context_window: int = 200000, max_tokens: int = 8192, temperature: float = 0.7, cost: ModelCost | None = None, thinking_level_map: dict[str, str | None] | None = None) -> BoundModel
stream
stream(model: Model, messages: list[Message], *, system_prompt: str = '', tools: list[ToolDefinition] | None = None, options: StreamOptions | None = None) -> MessageStream
generate
generate(model: Model, messages: list[Message], *, system_prompt: str = '', tools: list[ToolDefinition] | None = None, options: StreamOptions | None = None, max_output_tokens: int | None = None, temperature: float | None = None, thinking: ThinkingLevel | None = None, thinking_budgets: ThinkingBudgets | None = None) -> AssistantMessage
Run a single provider call and return the final assistant message.
subscribe_request
subscribe_request(cb: OnRequestCallback) -> Callable[[], None]
Register a persistent observer for request payloads.
Returns a detach callable that removes this specific subscription.
subscribe_chunk
subscribe_chunk(cb: OnChunkCallback) -> Callable[[], None]
Register a persistent observer for stream chunks.
Returns a detach callable.
subscribe_response
subscribe_response(cb: OnResponseBodyCallback) -> Callable[[], None]
Register a persistent observer for assembled responses.
Returns a detach callable.
BoundModel
class
BoundModel(self, provider: Provider, spec: Model)
Attributes
provider:Providerspec:Model
Methods
stream
stream(messages: list[Message], *, system_prompt: str = '', tools: list[ToolDefinition] | None = None, options: StreamOptions | None = None) -> MessageStream
generate
generate(messages: list[Message], *, system_prompt: str = '', tools: list[ToolDefinition] | None = None, options: StreamOptions | None = None, max_output_tokens: int | None = None, temperature: float | None = None, thinking: ThinkingLevel | None = None, thinking_budgets: ThinkingBudgets | None = None) -> AssistantMessage
Content
attribute
Content = TextContent | ImageContent
ImageContent
class
Attributes
type:Literal['image']source:strmedia_type:str
Message
attribute
Message = UserMessage | AssistantMessage | ToolResultMessage
MessageStream
class
MessageStream(self)
Methods
attach_task
attach_task(task: asyncio.Task) -> None
push
push(event: StreamEvent) -> None
set_result
set_result(message: AssistantMessage) -> None
result
result() -> AssistantMessage
Return the final assistant message.
Blocks not just on the result future, but also on the producer
task's completion — the producer's finally block runs
_fire_response_listeners (after set_result), so
without waiting for the task, callers under asyncio.run
teardown could exit before async response listeners have run.
Producer exceptions are NOT re-raised here; they were already
surfaced via the result future or the stream's error events.
Model
class
Attributes
id:strprovider_id:strapi:strreasoning:boolcontext_window:intmax_tokens:inttemperature:floatcost:ModelCost | Nonethinking_level_map:dict[str, str | None] | None
ModelCost
class
Attributes
input:floatoutput:floatcache_read:floatcache_write:float
OnChunkCallback
attribute
OnChunkCallback = Callable[['StreamEvent', Model], Awaitable[None] | None]
Persistent observer. Fires for every StreamEvent pushed onto the stream (start, text_delta, thinking_delta, toolcall_delta, done, error, ...). Heavy listeners should early-return on irrelevant event types — this hook fires hot. Return value is ignored.
OnPayloadCallback
attribute
OnPayloadCallback = Callable[[dict, Model], Awaitable[dict | None] | dict | None]
Optional callback for inspecting/replacing provider payloads before sending. Return a dict to replace the payload, or None to keep unchanged.
OnRequestCallback
attribute
OnRequestCallback = Callable[[dict, Model], Awaitable[None] | None]
Persistent observer. Fires just before HTTP send, after any per-call
StreamOptions.on_payload mutation has been applied. Receives the final
wire payload dict and the Model. Return value is ignored.
OnResponseBodyCallback
attribute
OnResponseBodyCallback = Callable[[dict | None, Model, BaseException | None], Awaitable[None] | None]
Persistent observer. Fires exactly once per stream() call, in a
finally block, after the stream terminates.
- body: assembled provider response as a dict (same shape a non-streaming call to the provider would have returned), or None if the stream failed before a response could be assembled.
- exc: the exception that ended the stream (including
asyncio.CancelledError), or None on normal completion. Return value is ignored.
OnResponseCallback
attribute
OnResponseCallback = Callable[['ProviderResponse', Model], Awaitable[None] | None]
Optional callback invoked after an HTTP response is received.
Provider
class
Methods
stream
stream(model: Model, messages: list[Message], *, system_prompt: str = '', tools: list[ToolDefinition] | None = None, options: StreamOptions | None = None) -> MessageStream
generate
generate(model: Model, messages: list[Message], *, system_prompt: str = '', tools: list[ToolDefinition] | None = None, options: StreamOptions | None = None, max_output_tokens: int | None = None, temperature: float | None = None, thinking: ThinkingLevel | None = None, thinking_budgets: ThinkingBudgets | None = None) -> AssistantMessage
ProviderResponse
class
ProviderResponse(self, status: int, headers: dict[str, str] = dict())
HTTP response metadata exposed to on_response callbacks.
Attributes
status:intheaders:dict[str, str]
StreamEvent
class
Attributes
type:Literal['start', 'text_start', 'text_delta', 'text_end', 'thinking_start', 'thinking_delta', 'thinking_end', 'toolcall_start', 'toolcall_delta', 'toolcall_end', 'done', 'error']content_index:int | Nonedelta:str | Nonepartial:AssistantMessage | Noneerror_message:str | None
StreamOptions
class
Options bag for Provider.stream(), transparent to the agent loop.
Attributes
model_configthinking:ThinkingLevelthinking_budgets:ThinkingBudgets | Nonesignal:asyncio.Event | Noneon_payload:OnPayloadCallback | Noneon_response:OnResponseCallback | None
TextContent
class
Attributes
type:Literal['text']text:str
ThinkingBudgets
class
Token budgets for each thinking level.
Attributes
minimal:intlow:intmedium:inthigh:int
ThinkingContent
class
Attributes
type:Literal['thinking']thinking:str
ThinkingLevel
attribute
ThinkingLevel = Literal['off', 'minimal', 'low', 'medium', 'high', 'xhigh']
ToolCall
class
Attributes
type:Literal['tool_call']id:strname:strarguments:JsonObject
ToolDefinition
class
Attributes
name:strdescription:strparameters:JsonObject
ToolResultMessage
class
Attributes
role:Literal['tool_result']tool_call_id:strtool_name:strcontent:list[Content]details:StructuredValueis_error:booltimestamp:float | Nonemetadata:JsonObjectrun_id:str | None
Usage
class
Attributes
input_tokens:intoutput_tokens:intcache_read_tokens:intcache_write_tokens:int
UserMessage
class
Attributes
role:Literal['user']content:list[Content]timestamp:float | Nonemetadata:JsonObjectrun_id:str | None
adjust_max_tokens_for_thinking
function
adjust_max_tokens_for_thinking(base_max_tokens: int, model_max_tokens: int, reasoning_level: ThinkingLevel, custom_budgets: ThinkingBudgets | None = None) -> tuple[int, int]
Adjust max_tokens to reserve space for a thinking budget.
Given a base max_tokens (the desired output capacity), increases it to
accommodate the thinking budget while respecting the model's hard cap.
If the model cap is too small to fit both, the thinking budget is reduced
to leave at least min_output_tokens (1024) for output.
Returns
- A
(max_tokens, thinking_budget)tuple.
FauxProvider
class
FauxProvider(self, *, tokens_per_second: float | None = None, token_size_min: int = 3, token_size_max: int = 5, provider_id: str = '')
Attributes
call_countpending_response_count:intprompt_cache:dict[str, str]— Read-only access to the prompt cache for test assertions.
Methods
set_responses
set_responses(responses: list[FauxResponseStep]) -> None
append_responses
append_responses(responses: list[FauxResponseStep]) -> None
clear_prompt_cache
clear_prompt_cache() -> None
Clear the prompt cache, useful between test scenarios.
stream
stream(model: Model, messages: list[Message], *, system_prompt: str = '', tools: list[ToolDefinition] | None = None, options: StreamOptions | None = None) -> MessageStream
faux_assistant_message
function
faux_assistant_message(content: str | FauxContentBlock | list[FauxContentBlock], *, stop_reason: str = 'stop', error_message: str | None = None) -> AssistantMessage
faux_text
function
faux_text(text: str) -> TextContent
faux_thinking
function
faux_thinking(thinking: str) -> ThinkingContent
faux_tool_call
function
faux_tool_call(name: str, arguments: dict[str, Any], *, id: str | None = None) -> ToolCall
DEFAULT_TRIGGER_ERRORS
attribute
DEFAULT_TRIGGER_ERRORS: frozenset[type[ProviderError]] = frozenset({RateLimited, ProviderUnavailable, ContextLengthExceeded})
FallbackBoundModel
class
FallbackBoundModel(self, chain: tuple[BoundModel, ...], trigger_errors: frozenset[type[ProviderError]] = DEFAULT_TRIGGER_ERRORS, on_failover: Callable[[BoundModel, BoundModel | None, BaseException | str], Awaitable[None] | None] | None = None)
Ordered chain of BoundModels — tries each in turn on retriable errors.
chain[0] is the primary model. On a trigger_errors exception or a first-event error from stream(), the next model in the chain is tried transparently. For generate(), an error AssistantMessage (stop_reason="error") also triggers failover. Mid-stream errors (after the first non-error event) are forwarded as-is.
provider and spec proxy chain[0] so tracing/billing code that reads agent._model.provider / agent._model.spec continues to work unchanged.
Known limitation: Tracer.attach() and Meter.attach() subscribe only to chain[0].provider. Successful fallback calls (chain[1..]) are invisible to provider-level observability (chat spans, token/cost metrics). Tracked as a follow-up: update recorder.py and meter.py to iterate chain and subscribe to every unique BaseProvider.
Attributes
chain:tuple[BoundModel, ...]trigger_errors:frozenset[type[ProviderError]]on_failover:Callable[[BoundModel, BoundModel | None, BaseException | str], Awaitable[None] | None] | Noneprovider:Providerspec:Model
Methods
stream
stream(messages: list[Message], *, system_prompt: str = '', tools: list[ToolDefinition] | None = None, options: StreamOptions | None = None) -> MessageStream
generate
generate(messages: list[Message], *, system_prompt: str = '', tools: list[ToolDefinition] | None = None, options: StreamOptions | None = None, max_output_tokens: int | None = None, temperature: float | None = None, thinking: ThinkingLevel | None = None, thinking_budgets: ThinkingBudgets | None = None) -> AssistantMessage
THINKING_LEVELS
attribute
THINKING_LEVELS: list[ThinkingLevel] = ['off', 'minimal', 'low', 'medium', 'high', 'xhigh']
clamp_thinking_level
function
clamp_thinking_level(model: Model, level: ThinkingLevel) -> ThinkingLevel
Clamp level to the nearest supported level for model.
If level is already supported, return it unchanged. Otherwise search upward first (higher intensity), then downward, through the ordered level list to find the closest available level.
get_supported_thinking_levels
function
get_supported_thinking_levels(model: Model) -> list[ThinkingLevel]
Return the thinking levels supported by model.
- Non-reasoning models only support
["off"]. - For reasoning models, levels are filtered through the model's
thinking_level_map. A level mapped toNoneis unsupported."xhigh"is only included when it has an explicit (non-None) mapping. All other levels are included by default when the map omits them.
models_are_equal
function
models_are_equal(a: Model | None, b: Model | None) -> bool
Return True if a and b refer to the same model.
Comparison is by id and provider_id. Returns False when either
argument is None.