跳到主要内容
版本:0.6

Recipe:可恢复长任务

当 agent 正在执行一个长时间运行的操作(一系列工具调用、多轮推理会话) 时,如果进程崩溃,你希望能从中断处继续,而不是从头开始。 CubePi 的追加写入 checkpointing 加上 agent.resume()轮次之间 的恢复变得轻而易举;而工具执行中途的恢复则需要多一些准备。

预计耗时: 15 分钟。 依赖: cubepi[sqlite]ANTHROPIC_API_KEY

模式概述

有三种崩溃点需要考虑:

  1. 轮次之间 —— 模型已回复,无工具可运行,循环在迭代间等待。 resume() 会重新调用模型。使用 checkpointer 即免费获得。
  2. 工具结果之后、模型调用之前 —— 工具结果已持久化。resume() 看到最后一条消息是 ToolResultMessage,重新调用模型。 使用 checkpointer 即免费获得。
  3. 工具执行中途 —— 工具已启动但未完成。尚未持久化任何内容 (CubePi 只持久化消息)。需要工具内部的幂等性。需要额外处理。

本 recipe 重点关注第 3 种情况。

带外部状态的幂等工具

模式:每个工具操作都有一个确定性的幂等键。在执行工作之前,先检查 是否已经完成过。

tools.py
import os
import json
from pathlib import Path

from pydantic import BaseModel
from cubepi import AgentTool, AgentToolResult, TextContent


# 简单的文件支撑 job store;生产中替换为 Redis / Postgres。
JOB_DIR = Path(os.environ.get("JOB_DIR", "/tmp/cubepi-jobs"))
JOB_DIR.mkdir(parents=True, exist_ok=True)


class TranscodeParams(BaseModel):
source_path: str
output_path: str


async def transcode_video(tool_call_id, params: TranscodeParams, *, signal=None, on_update=None):
job_key = f"transcode:{params.source_path}->{params.output_path}"
job_file = JOB_DIR / f"{job_key.replace('/', '_')}.json"

if job_file.exists():
# 上次运行已完成。
state = json.loads(job_file.read_text())
return AgentToolResult(
content=[TextContent(text=f"Already transcoded to {state['output_path']}.")],
details=state,
)

# 执行实际工作(长时间运行,代价高昂)。
# 使用 signal 在取消时干净地中止。
output = await run_ffmpeg(params.source_path, params.output_path, signal=signal)

# 在工作成功后再写入 job 完成标记。
job_file.write_text(json.dumps({"output_path": output}))

return AgentToolResult(
content=[TextContent(text=f"Transcoded to {output}.")],
details={"output_path": output},
)


transcode_tool = AgentTool(
name="transcode_video",
description="Transcode a video file. Idempotent — safe to retry.",
parameters=TranscodeParams,
execute=transcode_video,
execution_mode="sequential", # 一次只转码一个
)

现在,如果进程在 run_ffmpeg 执行期间崩溃,下次 agent 运行时会发现 job_file.exists() == False,重新执行工作,并且只在成功后写入标记。 如果进程在标记已写入后崩溃,下次运行会发现标记,立即返回缓存结果, agent 继续运行,就好像刚刚完成了一样。

恢复 agent

resume.py
import asyncio
import os
import sys

from cubepi import Agent, Model
from cubepi.checkpointer import SQLiteCheckpointer
from cubepi.providers.anthropic import AnthropicProvider

from tools import transcode_tool # 上面封装好的 AgentTool


async def main(thread_id: str, initial_prompt: str | None):
async with SQLiteCheckpointer("jobs.db") as cp:
agent = Agent(
provider=AnthropicProvider(api_key=os.environ["ANTHROPIC_API_KEY"]),
model=Model(id="claude-sonnet-4-5-20250929", provider="anthropic"),
system_prompt="You orchestrate video transcoding jobs.",
tools=[transcode_tool],
checkpointer=cp,
thread_id=thread_id,
)
agent.subscribe(lambda e, s=None: None)

if initial_prompt:
# 全新运行。prompt() 在第一次调用前自动加载历史记录,
# 然后追加新的用户消息。
await agent.prompt(initial_prompt)
else:
# 恢复。agent.resume() 不会自动加载 —— 只有 prompt() 会。
# 需要先手动恢复 agent 状态。
data = await cp.load(thread_id)
if data is None:
raise RuntimeError(f"No saved state for thread {thread_id!r}")
agent.state.messages = list(data.messages)
# `extra` 也会被恢复;它在 Agent 上是私有的,如果你的
# middleware 需要读取,请使用 checkpointer 的视图。

# resume 从最后一条持久化消息继续:
# ToolResultMessage / UserMessage → 重新调用模型
# 没有排队 steer/follow_up 的 AssistantMessage → 抛出异常
await agent.resume()


if __name__ == "__main__":
thread_id = sys.argv[1]
initial = sys.argv[2] if len(sys.argv) > 2 else None
asyncio.run(main(thread_id, initial))

工作流:

# 启动一个 job:
python resume.py job-1 "Transcode /videos/a.mov to /out/a.mp4 and /videos/b.mov to /out/b.mp4"

# 中途终止:Ctrl-C。

# 恢复 —— agent 从最后一条持久化消息继续:
python resume.py job-1

三种恢复场景的代码

async def smart_resume(agent, cp, thread_id):
# resume() 不会自动加载 —— 如果 agent 状态为空,先手动恢复。
if not agent.state.messages:
data = await cp.load(thread_id)
if data is None or not data.messages:
return False # 没有可恢复的内容
agent.state.messages = list(data.messages)

last = agent.state.messages[-1]
last_role = type(last).__name__

if last_role == "AssistantMessage":
# 要么自然结束,要么在一轮结束后立即崩溃。
# 除非有排队的 steering,否则 resume() 会抛出。
# 最简单的方案:询问用户下一步做什么。
return False

# 最后一条是 UserMessage 或 ToolResultMessage —— 可以安全恢复。
await agent.resume()
return True

持久化与中止

agent.abort() 触发干净的拆卸并发射 agent_end。最后一条完全持久化 的消息是通过 message_end 提交的那条。工具执行期间的中止不会持久化 工具结果(工具未返回),因此 resume() 会使用包含未完成 ToolCall 的最后一条 AssistantMessage 重新调度模型。模型通常会重新发出调用 —— 你的幂等性保护机制会处理剩下的一切。

关于持久化部分工具状态

CubePi 不提供"持久化部分工具结果"的 API。预期的模式是:将部分状态 保存在工具自己的外部存储中(文件系统、Redis、S3),以工具参数为键 进行确定性寻址。上面的 transcode_videoJOB_DIR 实现的就是这种模式。

常见陷阱

  • 非幂等工具 —— 没有确定性键,重试可能导致信用卡被扣两次或发送 重复邮件。始终用幂等键包裹外部副作用。
  • job 标记放在 /tmp —— 重启时会被清除。生产 job 请使用真正的 持久化层。
  • 在无队列的 assistant 消息后调用 resume() —— 会抛出异常。 要么向用户询问下一条消息,要么重新调用 prompt()
  • 在全新 agent 上调用 resume() —— 会抛出 No messages to continue fromresume() 不会从 checkpointer 自动加载;只有 prompt() 会。 请先手动恢复:agent.state.messages = (await cp.load(thread_id)).messages
  • 忘记在工具内部检查 signal —— 一个长时间运行的 await asyncio.sleep(...) 或忽略 signal.is_set()for ... in stream 不会响应 abort。请在任何热循环内部添加检查。

另请参见