Recipe: Multi-Provider Failover
When Anthropic is rate-limited or down, fail over to OpenAI without
crashing the agent. We'll wrap both providers behind a single
Provider adapter that does its own retry/failover logic.
Time to run: 10 minutes.
Deps: cubepi, ANTHROPIC_API_KEY, OPENAI_API_KEY.
The wrapper provider
import asyncio
import logging
import time
from typing import Sequence
from cubepi.providers.base import (
AssistantMessage,
Message,
MessageStream,
Model,
Provider,
StreamEvent,
StreamOptions,
ToolDefinition,
Usage,
)
from cubepi.providers.anthropic import AnthropicProvider
from cubepi.providers.openai import OpenAIProvider
log = logging.getLogger(__name__)
class FailoverProvider:
"""Try providers in order; fall over on construction or first-event errors.
Built-in providers swallow API/network errors and surface them as
`StreamEvent(type="error")` on the returned stream — never as exceptions
out of `provider.stream()`. So we peek at the first event from each
inner stream and only commit to it once we see a non-error event.
Limitation: errors that arrive *after* the first event (e.g. mid-stream
rate limit, server disconnect) are forwarded to the agent as-is.
Fully replaying a half-streamed turn against a fallback provider would
require buffering the whole turn — out of scope here.
"""
def __init__(self, primary_pair: tuple[Provider, Model], *fallbacks: tuple[Provider, Model]) -> None:
self._chain: list[tuple[Provider, Model]] = [primary_pair, *fallbacks]
async def stream(
self,
model: Model,
messages: list[Message],
*,
system_prompt: str = "",
tools: list[ToolDefinition] | None = None,
options: StreamOptions | None = None,
) -> MessageStream:
last_error: str | None = None
for provider, mapped_model in self._chain:
# Construction-time failures (rare — most stay inside the producer task).
try:
inner = await provider.stream(
mapped_model,
messages,
system_prompt=system_prompt,
tools=tools,
options=options,
)
except Exception as e:
log.warning("provider %s failed at construction: %s", mapped_model.provider, e)
last_error = repr(e)
continue
# Peek at the first event to learn whether the stream is healthy.
iterator = inner.__aiter__()
try:
first = await iterator.__anext__()
except StopAsyncIteration:
last_error = "stream ended before producing any events"
continue
if first.type == "error":
log.warning("provider %s errored on first event: %s",
mapped_model.provider, first.error_message)
last_error = first.error_message or "stream error"
continue
# Healthy — commit to this provider. Forward `first` plus the rest
# through a fresh outer MessageStream so the caller sees a complete
# stream starting at the start event.
outer = MessageStream()
async def _forward(first_event=first, src=iterator, src_stream=inner):
try:
outer.push(first_event)
async for ev in src:
outer.push(ev)
final = await src_stream.result()
outer.set_result(final)
except Exception as exc:
fallback_msg = AssistantMessage(
content=[],
stop_reason="error",
error_message=str(exc),
usage=Usage(),
timestamp=time.time(),
)
outer.push(StreamEvent(type="error", error_message=str(exc)))
outer.set_result(fallback_msg)
outer.attach_task(asyncio.create_task(_forward()))
return outer
raise RuntimeError(f"all providers exhausted; last error: {last_error!r}")
Use it
import asyncio
import os
from cubepi import Agent, Model
from cubepi.providers.anthropic import AnthropicProvider
from cubepi.providers.openai import OpenAIProvider
from failover import FailoverProvider
async def main():
failover = FailoverProvider(
(
AnthropicProvider(api_key=os.environ["ANTHROPIC_API_KEY"]),
Model(id="claude-sonnet-4-5-20250929", provider="anthropic"),
),
(
OpenAIProvider(api_key=os.environ["OPENAI_API_KEY"]),
Model(id="gpt-5", provider="openai"),
),
)
# The model passed here is overridden inside FailoverProvider; pass any
# placeholder. We use the primary's so usage tracking labels match the
# happy path.
agent = Agent(
provider=failover,
model=Model(id="claude-sonnet-4-5-20250929", provider="anthropic"),
system_prompt="You answer concisely.",
)
agent.subscribe(lambda e, s=None: None)
await agent.prompt("Capital of Mongolia?")
last = agent.state.messages[-1]
print(last.content[0].text)
asyncio.run(main())
What about smarter failover policies?
The example above falls back on any error event. That's fine for
RateLimitError, APIConnectionError, or 5xx — but arguably wrong for
BadRequestError (your code is wrong; the next provider will fail the
same way).
The first-event error_message comes from str(exc) on the
underlying SDK exception. Filter on substrings, or — better — wrap each
provider's _produce to tag the error category:
NON_RETRYABLE_HINTS = ("bad request", "invalid_request_error", "401", "403")
if first.type == "error":
msg = (first.error_message or "").lower()
if any(h in msg for h in NON_RETRYABLE_HINTS):
raise RuntimeError(f"non-retryable error from {mapped_model.provider}: {msg}")
last_error = first.error_message
continue
A more robust approach is to fork the built-in providers and re-raise
specific SDK exceptions from _produce so they reach provider.stream()
as real Python exceptions — but that's a larger change against
CubePi itself.
Adding circuit breaking
Don't keep retrying a provider that's clearly down. A simple counter:
import time
class CircuitBreaker:
def __init__(self, failure_threshold: int = 3, recovery_seconds: float = 60) -> None:
self._failures = 0
self._opened_at: float | None = None
self._threshold = failure_threshold
self._recovery = recovery_seconds
def can_attempt(self) -> bool:
if self._opened_at and time.monotonic() - self._opened_at < self._recovery:
return False
if self._opened_at:
self._opened_at = None # half-open
return True
def record_failure(self) -> None:
self._failures += 1
if self._failures >= self._threshold:
self._opened_at = time.monotonic()
self._failures = 0
def record_success(self) -> None:
self._failures = 0
Hold one CircuitBreaker per provider in the FailoverProvider,
skip if can_attempt() is False.
Per-tool failover doesn't apply
This recipe handles provider failures. Tool failures are different — see Middleware → Retries for that pattern.
Common pitfalls
- Different tool schemas across providers — Both built-in
providers accept the same
ToolDefinition, but extra-body customisations (e.g. OpenAIparallel_tool_calls=False) won't carry to Anthropic. Keep cross-provider behaviour intransform_context, not inextra_body. - Different cost — Failover from Anthropic to OpenAI changes
per-token cost. Track which provider answered (via
on_responseorAssistantMessage.provider_id) and bill accordingly. - Streaming consistency — The wrapper forwards events through a
fresh
MessageStream, so consumers see the sameStreamEventshape regardless of which provider answered. The originalstartevent comes from the inner provider unchanged. - Mid-stream errors aren't recovered — Once we've seen a healthy first event, the wrapper commits to that provider. If it errors halfway through a long response, the agent sees the error. Full mid-stream replay would require buffering — out of scope here.
See also
- Providers / Anthropic and OpenAI — provider-specific details.
- Writing a Custom Provider — the same Protocol used by this wrapper.