Skip to main content
Version: 0.4

Middleware Examples

Working middleware for the four most common needs: rate limiting, retries, structured logging, and context truncation.

Rate limiting

Block tool calls when a user exceeds quota. Combine before_tool_call with an external rate-limiter (a token bucket, a Redis INCR, …).

import time
from cubepi import Middleware
from cubepi.agent.types import BeforeToolCallResult


class RateLimitMiddleware(Middleware):
def __init__(self, max_calls_per_min: int) -> None:
self.max = max_calls_per_min
self._timestamps: list[float] = []

async def before_tool_call(self, ctx, *, signal=None):
now = time.monotonic()
# Drop entries older than 60 s.
self._timestamps = [t for t in self._timestamps if now - t < 60]
if len(self._timestamps) >= self.max:
return BeforeToolCallResult(
block=True,
reason=f"Rate limit: {self.max} tool calls/min exceeded. Try again shortly.",
)
self._timestamps.append(now)
return None

Use:

agent = Agent(provider=, model=, middleware=[RateLimitMiddleware(max_calls_per_min=30)])

When the limit hits, the model sees a tool result that says "Rate limit exceeded…" and usually waits or asks the user.

Retries with backoff

Retry failed tool calls inside after_tool_call. Up to N times, with exponential backoff, only for transient errors.

import asyncio
from cubepi import Middleware
from cubepi.agent.types import AfterToolCallResult


class RetryMiddleware(Middleware):
def __init__(self, max_retries: int = 3, base_delay: float = 0.5) -> None:
self.max_retries = max_retries
self.base_delay = base_delay

async def after_tool_call(self, ctx, *, signal=None):
if not ctx.is_error:
return None

# Find the tool by name and re-execute up to max_retries times.
tool = next(
(t for t in (ctx.context.tools or []) if t.name == ctx.tool_call.name),
None,
)
if tool is None:
return None

for attempt in range(1, self.max_retries + 1):
await asyncio.sleep(self.base_delay * (2 ** (attempt - 1)))
try:
new_result = await tool.execute(
ctx.tool_call.id,
ctx.args,
signal=signal,
on_update=None,
)
return AfterToolCallResult(
content=new_result.content,
details={"retried": attempt, "original_error": ctx.result.content},
is_error=False,
)
except Exception:
continue

return None # give up — original error stays

Combine with caution: retrying non-idempotent tools (writes, sends, deletes) can cause real damage. Mark such tools execution_mode="sequential" and skip them here based on ctx.tool_call.name.

Structured logging

Log every tool call with its arguments, duration, and outcome. Pairs before_tool_call (to record start time) with after_tool_call (to record the result). Stash the start time in ctx.context.extra.

import time, logging
from cubepi import Middleware

log = logging.getLogger("cubepi.tools")


class ToolLoggingMiddleware(Middleware):
async def before_tool_call(self, ctx, *, signal=None):
ctx.context.extra.setdefault("_tool_starts", {})[ctx.tool_call.id] = time.monotonic()
return None

async def after_tool_call(self, ctx, *, signal=None):
started = ctx.context.extra.get("_tool_starts", {}).pop(ctx.tool_call.id, None)
duration_ms = int((time.monotonic() - started) * 1000) if started else None
log.info(
"tool_call",
extra={
"tool_name": ctx.tool_call.name,
"args": ctx.args.model_dump() if hasattr(ctx.args, "model_dump") else ctx.args,
"is_error": ctx.is_error,
"duration_ms": duration_ms,
},
)
return None

ctx.context.extra is the right place to stash per-run state because it's:

  • Visible to other middleware via the same ctx.context.
  • Persisted by checkpointers via save_extra at agent_end.
  • Reset when a new conversation starts (a new thread_id).

Sliding-window truncation

Keep the model's context bounded by retaining only the most recent N messages, plus the system prompt:

from cubepi import Middleware


class SlidingWindow(Middleware):
def __init__(self, max_messages: int = 20) -> None:
self.max_messages = max_messages

async def transform_context(self, messages, *, signal=None):
if len(messages) <= self.max_messages:
return messages
return messages[-self.max_messages:]

transform_context doesn't touch agent.state.messages — the user keeps seeing the full history. The model just sees the last N.

Pairs well with a transform_system_prompt that injects a summary of what was dropped:

class SummaryInjector(Middleware):
async def transform_system_prompt(self, system_prompt, *, signal=None):
summary = "Earlier in this conversation we discussed: …"
return f"{system_prompt}\n\nContext: {summary}".strip()

Max turns / budget cap

Hard-stop the agent after a maximum number of turns or a cost cap:

class MaxTurns(Middleware):
def __init__(self, max_turns: int) -> None:
self.max_turns = max_turns
self.turns = 0

async def should_stop_after_turn(self, ctx):
self.turns += 1
return self.turns >= self.max_turns


class BudgetCap(Middleware):
def __init__(self, usd: float, model_cost) -> None:
self.cap = usd
self.cost = model_cost # cubepi.providers.ModelCost or similar
self.spent = 0.0

async def should_stop_after_turn(self, ctx):
m = ctx.message
if m.usage:
self.spent += (
(m.usage.input_tokens / 1_000_000) * self.cost.input
+ (m.usage.output_tokens / 1_000_000) * self.cost.output
)
return self.spent >= self.cap

Structured output with after_model_response

Validate JSON output and re-prompt if it doesn't parse:

import json
from cubepi import Middleware
from cubepi.middleware.base import TurnAction
from cubepi.providers.base import TextContent, UserMessage


class JSONOutputValidator(Middleware):
def __init__(self, schema_cls) -> None:
self.schema = schema_cls

async def after_model_response(self, response, ctx, *, signal=None):
text = "".join(
c.text for c in response.content if isinstance(c, TextContent)
)
try:
obj = json.loads(text)
self.schema.model_validate(obj)
return None # valid — proceed naturally
except Exception as e:
return TurnAction(
inject_messages=[
UserMessage(content=[TextContent(text=f"Invalid output: {e}. Return valid JSON.")]),
],
decision="loop_to_model",
)

The agent will skip tool execution and immediately re-prompt the model with the feedback message in context.

See also