跳到主要内容
版本:0.5(最新)

核心概念

CubePi 的全部能力可以归纳为六个概念。这一页读一遍,后面的文档基本就 变成查表了。

Agent

Agent 是有状态的门面:你用 provider、model、可选的工具、可选的 middleware/checkpointer 构造它。然后通过三个方法驱动它:

  • await agent.prompt(message) —— 用一条 user 消息开启新一轮。
  • await agent.resume() —— 从最后一条已持久化的消息继续(配合 checkpointer 使用)。
  • agent.steer(message) / agent.follow_up(message) —— 在正在跑的 过程中插入消息,或为当前运行结束后排队下一条。

Agent 持有一个 AgentState(system prompt、tools、model、消息历史、 未结束的 tool call、流式标志)和一个 subscribers 列表:

unsubscribe = agent.subscribe(my_listener)
# ...
unsubscribe()

Subscriber 会收到循环发出的每一个 AgentEvent。可以是同步或异步函数。

Tool

AgentTool = name + description + Pydantic 参数模型 + 异步 execute

from pydantic import BaseModel
from cubepi import AgentTool, AgentToolResult, TextContent

class SearchParams(BaseModel):
query: str
limit: int = 10

async def execute(tool_call_id, params: SearchParams, *, signal=None, on_update=None):
# 干活;如果可以被打断,记得检查 signal
return AgentToolResult(content=[TextContent(text=f"…")])

search = AgentTool(
name="search",
description="搜索语料库",
parameters=SearchParams,
execute=execute,
)

Pydantic schema 会自动转成 JSON Schema 喂给模型。参数解析、错误包装、 并行执行都由框架处理。execution_modeon_update(增量进度)、 terminate(在工具里结束本轮)见 工具使用

Provider

任何匹配下面 Protocol 的对象就是 Provider:

class Provider(Protocol):
async def stream(
self,
model: Model,
messages: list[Message],
*,
system_prompt: str = "",
tools: list[ToolDefinition] | None = None,
options: StreamOptions | None = None,
) -> MessageStream: ...

它返回一个 MessageStream —— 一个统一的异步迭代器,产出 StreamEvent, 并通过 await stream.result() 暴露最终的 AssistantMessage。内置 Provider:

  • AnthropicProvider —— Claude(Messages API,支持思考、缓存、工具使用)。
  • OpenAIProvider —— GPT 家族(Chat Completions API)。
  • OpenAIResponsesProvider —— GPT 家族(Responses API,服务端状态)。
  • FauxProvider —— 确定性测试替身(不发任何网络请求)。

实现一个方法就能写自己的。见 Providers / 自定义

Stream 和事件

流和事件分两层:

  • Provider 流 —— MessageStream 产出的是 provider 事件: starttext_starttext_deltatext_endthinking_*toolcall_*doneerror。原始 token 流。
  • Agent 事件 —— agent.subscribe(...) 收到的内容。十一种类型 覆盖整个循环:agent_startagent_endturn_startturn_endmessage_startmessage_updatemessage_endtool_execution_starttool_execution_updatetool_execution_endmessage_update 把 provider 事件嵌套在 event.stream_event 里。

做 UI 订阅 Agent 事件;做底层 token 路由就钻 event.stream_event。 见 流式事件

Middleware

Middleware 是有最多七个类型化 hook 的类:

Hook何时触发组合规则
transform_context每次调模型之前,处理消息列表链式 —— 每个收到上一个的输出
convert_to_llmprovider 序列化之前最后一个实现生效
transform_system_prompt每次调模型之前,处理 system prompt链式
before_tool_call每个工具调用之前(在参数校验后)第一个 block=True 短路
after_tool_call每个工具调用之后(在 execute 之后)后写覆盖先写
after_model_responseassistant 消息落定之后返回 TurnAction 控制流向
should_stop_after_turn每个轮次结束时任一返回 True 即停

通过 Agent(middleware=[...]) 传入。见 Middleware → 组合规则

Checkpointer

任何匹配下面 Protocol 的对象就是 Checkpointer:

class Checkpointer(Protocol):
async def load(self, thread_id: str) -> CheckpointData | None: ...
async def append(self, thread_id: str, messages: list[Message]) -> None: ...
async def save_extra(self, thread_id: str, extra: dict) -> None: ...

通过 Agent(checkpointer=cp, thread_id="…") 绑定到 Agent,循环就会在 每条消息落定时追加一行,并在第一次 prompt() 时恢复历史。内置后端: MemoryCheckpointerSQLiteCheckpointerPostgresCheckpointer。 见 Checkpointing → SQLite

Tracer(可选)

Tracer 输出符合 OpenTelemetry GenAI 语义约定 的 span,任何 OTLP 后端(Jaeger、Tempo、Honeycomb、Datadog、AWS X-Ray 等)都能直接接收,无需额外 instrumentation。先装 extra:

pip install "cubepi[tracing]" # OTel SDK
pip install "cubepi[tracing-otlp]" # + OTLP/HTTP 导出器

然后用 async with 包住 Agent:

from cubepi.tracing import Tracer
from cubepi.tracing.exporters import JsonlSpanExporter

async with (
Tracer(
service_name="my-bot",
agent_name="assistant",
exporters=[JsonlSpanExporter(directory="./cubepi-traces")],
) as tracer,
tracer.attached(agent),
):
await agent.prompt("…")

每次 run 会发出一个 invoke_agent 根 span,其下每轮 LLM 往返对应 一个 cubepi.turn,再嵌套 chat(CLIENT)和 execute_tool 子 span。默认不记录任何 prompt 内容或模型输出 —— 需要的话用 Tracer(record_content=True) 显式打开,搭配 redact 回调脱敏。配 合 Meter(...) 还能拿到 token / 时延 / TTFC 直方图。完整指南: 追踪 → 概览

拼起来

用户代码


┌──────────────────────────────────────────┐
│ Agent │
│ ├─ AgentState (messages, tools, …) │
│ ├─ Middleware ── compose_middleware() │
│ ├─ Checkpointer ── message_end 时追加 │
│ └─ run_agent_loop ◀──── 真实的循环 │
│ │ │
│ ▼ │
│ Provider.stream() → MessageStream │
│ │ │
│ └─ events → emit → subscribers │
└──────────────────────────────────────────┘

这张图就是整个框架。文档站的其余部分,都是细节而已。