跳到主要内容
版本:Next 🚧

核心概念

CubePi 的全部能力可以归纳为六个概念。这一页读一遍,后面的文档基本就 变成查表了。

Agent

Agent 是有状态的门面:你用 provider、model、可选的工具、可选的 middleware/checkpointer 构造它。然后通过三个方法驱动它:

  • await agent.prompt(message) —— 用一条 user 消息开启新一轮。
  • await agent.resume() —— 从最后一条已持久化的消息继续(配合 checkpointer 使用)。
  • agent.steer(message) / agent.follow_up(message) —— 在正在跑的 过程中插入消息,或为当前运行结束后排队下一条。

Agent 持有一个 AgentState(system prompt、tools、model、消息历史、 未结束的 tool call、流式标志)和一个 subscribers 列表:

unsubscribe = agent.subscribe(my_listener)
# ...
unsubscribe()

Subscriber 会收到循环发出的每一个 AgentEvent。可以是同步或异步函数。

Tool

AgentTool = name + description + Pydantic 参数模型 + 异步 execute

from pydantic import BaseModel
from cubepi import AgentTool, AgentToolResult, TextContent

class SearchParams(BaseModel):
query: str
limit: int = 10

async def execute(tool_call_id, params: SearchParams, *, signal=None, on_update=None):
# 干活;如果可以被打断,记得检查 signal
return AgentToolResult(content=[TextContent(text=f"…")])

search = AgentTool(
name="search",
description="搜索语料库",
parameters=SearchParams,
execute=execute,
)

Pydantic schema 会自动转成 JSON Schema 喂给模型。参数解析、错误包装、 并行执行都由框架处理。execution_modeon_update(增量进度)、 terminate(在工具里结束本轮)见 工具使用

Provider

任何匹配下面 Protocol 的对象就是 Provider:

class Provider(Protocol):
async def stream(
self,
model: Model,
messages: list[Message],
*,
system_prompt: str = "",
tools: list[ToolDefinition] | None = None,
options: StreamOptions | None = None,
) -> MessageStream: ...

它返回一个 MessageStream —— 一个统一的异步迭代器,产出 StreamEvent, 并通过 await stream.result() 暴露最终的 AssistantMessage。内置 Provider:

  • AnthropicProvider —— Claude(Messages API,支持思考、缓存、工具使用)。
  • OpenAIProvider —— GPT 家族(Chat Completions API)。
  • OpenAIResponsesProvider —— GPT 家族(Responses API,服务端状态)。
  • FauxProvider —— 确定性测试替身(不发任何网络请求)。

实现一个方法就能写自己的。见 Providers / 自定义

Stream 和事件

流和事件分两层:

  • Provider 流 —— MessageStream 产出的是 provider 事件: starttext_starttext_deltatext_endthinking_*toolcall_*doneerror。原始 token 流。
  • Agent 事件 —— agent.subscribe(...) 收到的内容。十四种类型 覆盖整个循环 + HITL:agent_startagent_endturn_startturn_endmessage_startmessage_updatemessage_endtool_execution_starttool_execution_updatetool_execution_endhitl_requesthitl_answeragent_suspendedagent_aborted

做 UI 订阅 Agent 事件;做底层 token 路由就钻 event.stream_event。 见 流式事件

Middleware

Middleware 是有最多七个类型化 hook 的类:

Hook何时触发组合规则
transform_context每次调模型之前,处理消息列表链式 —— 每个收到上一个的输出
convert_to_llmprovider 序列化之前最后一个实现生效
transform_system_prompt每次调模型之前,处理 system prompt链式
before_tool_call每个工具调用之前(在参数校验后)第一个 block=True 短路
after_tool_call每个工具调用之后(在 execute 之后)后写覆盖先写
after_model_responseassistant 消息落定之后返回 TurnAction 控制流向
should_stop_after_turn每个轮次结束时任一返回 True 即停

通过 Agent(middleware=[...]) 传入。见 Middleware → 组合规则

Checkpointer

任何匹配下面 Protocol 的对象就是 Checkpointer:

class Checkpointer(Protocol):
async def load(self, thread_id: str) -> CheckpointData | None: ...
async def append(self, thread_id: str, messages: list[Message]) -> None: ...
async def save_extra(self, thread_id: str, extra: dict) -> None: ...

通过 Agent(checkpointer=cp, thread_id="…") 绑定到 Agent,循环就会在 每条消息落定时追加一行,并在第一次 prompt() 时恢复历史。内置后端: MemoryCheckpointerSQLiteCheckpointerPostgresCheckpointerMySQLCheckpointer。 见 Checkpointing → SQLite

HITL 为跨进程挂起/恢复新增了两个可选方法:save_pending_request / load_pending_request。所有第一方后���都已实现。见 HITL 指南

HITL(人机协同)

CubePi 内置了 cubepi.hitl 模块,用于 agent 需要暂停并等待人类输入的 场景:

  • 沙箱确认 —— 危险工具(bash、写入文件)在执行前需要人类 approve / deny / edit。
  • 运行中提问 —— agent 在运行中途向用户弹出一个结构化表单,等待回答。
from cubepi.hitl import InMemoryChannel, ConfirmToolCallMiddleware, ask_user_tool

channel = InMemoryChannel()

agent = Agent(
provider=, model=,
tools=[bash_tool, ask_user_tool(channel)],
middleware=[ConfirmToolCallMiddleware(channel, require_confirm={"bash"})],
channel=channel,
)

Channel 是一个可 await 的协程协作者:工具和中间件作者写 await channel.ask(...)await channel.confirm(...),channel 处理暂停。宿主代码(你的 web 应用 / TUI)订阅 channel.subscribe() 或 轮询 channel.pending,把请求渲染给用户,然后通过 channel.answer(qid, answer) 回填答案。

内置两种 channel 后端:

  • InMemoryChannel —— 单进程(CLI、notebook、测试)。
  • CheckpointedChannel —— 跨进程(web 服务)。挂起的请求持久化到 checkpointer;另一个进程可以在数小时后通过 Agent.respond(question_id=, answer=) 回答。

完整细节——三种 HITL 动词、两套内置中间件、跨进程挂起/恢复协议、事件、 追踪 span 和错误参考——见 HITL 指南

Tracer(可选)

Tracer 输出符合 OpenTelemetry GenAI 语义约定 的 span,任何 OTLP 后端(Jaeger、Tempo、Honeycomb、Datadog、AWS X-Ray 等)都能直接接收,无需额外 instrumentation。先装 extra:

pip install "cubepi[tracing]" # OTel SDK
pip install "cubepi[tracing-otlp]" # + OTLP/HTTP 导出器

然后用 async with 包住 Agent:

from cubepi.tracing import Tracer
from cubepi.tracing.exporters import JsonlSpanExporter

async with (
Tracer(
service_name="my-bot",
agent_name="assistant",
exporters=[JsonlSpanExporter(directory="./cubepi-traces")],
) as tracer,
tracer.attached(agent),
):
await agent.prompt("…")

每次 run 会发出一个 invoke_agent 根 span,其下每轮 LLM 往返对应 一个 cubepi.turn,再嵌套 chat(CLIENT)和 execute_tool 子 span。默认不记录任何 prompt 内容或模型输出 —— 需要的话用 Tracer(record_content=True) 显式打开,搭配 redact 回调脱敏。配 合 Meter(...) 还能拿到 token / 时延 / TTFC 直方图。完整指南: 追踪 → 概览

拼起来

用户代码


┌──────────────────────────────────────────┐
│ Agent │
│ ├─ AgentState (messages, tools, …) │
│ ├─ Middleware ── compose_middleware() │
│ ├─ Checkpointer ── message_end 时追加 │
│ └─ run_agent_loop ◀──── 真实的循环 │
│ │ │
│ ▼ │
│ Provider.stream() → MessageStream │
│ │ │
│ └─ events → emit → subscribers │
└──────────────────────────────────────────┘

这张图就是整个框架。文档站的其余部分,都是细节而已。