Human-in-the-Loop (HITL)
CubePi's HITL channel lets an agent pause and ask a human before proceeding. It handles two recurring patterns with a single primitive:
- Sandbox tool confirmation โ a dangerous tool (bash, file writes, API mutations) needs approve / deny / edit from a human before running.
- Mid-run structured questions โ the agent needs a specific selection or form answer from the user before it can continue.
The channel is an await-able coroutine collaborator. Tool authors write
await channel.ask(...) and the channel handles the pause. Host code
(subscribers) renders the pending request and posts an answer. Two backends
cover the full spectrum:
InMemoryChannelโ CLI, notebooks, tests. Process dies, pending is lost.CheckpointedChannelโ web services. Persists the pending request to aCheckpointerso a different process (or the same process after restart) can pick up and answer hours later.
Architectureโ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Host (cubebox web service / CLI / TUI) โ
โ โ
โ subscribe to channel.pending โโโโโ answer / cancel โ
โโโโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโ
โ HitlChannel (Protocol) โ
โ confirm / approve / ask โ
โ pending / answer / cancel โ
โโโโโโโโฌโโโโโโโโโโโโโโโฌโโโโโโโ
โ โ
โโโโโโโโโโโโผโโโโ โโโโโโโโผโโโโโโโโโโโโโ
โInMemoryChannelโ โCheckpointedChannelโ
โ (Future+Queue)โ โ (Future + persi- โ
โ โ โ sts to Checkpo- โ
โ โ โ inter) โ
โโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโโ
โ โ
โ used by โโโโโค
โ โ
โโโโโโโโโโโโผโโโโโโโโโโโโโโโผโโโโโโโโโโโโโ
โ ask_user tool ConfirmToolCallMW โ
โ (structured form) (approve/deny/ โ
โ edit per-tool) โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโ
โ cubepi Agent loop โ
โ (BeforeToolCallResult โ
โ carries hitl_trace) โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Channel typesโ
InMemoryChannelโ
For single-process use. Holds an asyncio.Future internally โ the host
calls channel.answer() from the same event loop.
from cubepi.hitl import InMemoryChannel
channel = InMemoryChannel(
default_timeout=180.0, # per-call timeout; None = no timeout (default)
)
CheckpointedChannelโ
For cross-process use. On every confirm / approve / ask, the HitlRequest
is persisted via Checkpointer.save_pending_request(thread_id, ...). On
channel.answer(), the pending is cleared. On HitlDetached (graceful
suspend), the pending stays so a later Agent.respond() can resume.
from cubepi.hitl import CheckpointedChannel
from cubepi.checkpointer.sqlite import SQLiteCheckpointer # or postgres / mysql
async with SQLiteCheckpointer("path/to.db") as cp:
channel = CheckpointedChannel(
checkpointer=cp,
thread_id="conversation-42",
default_timeout=None, # cross-process: typical to disable
allow_inside_custom_tool=False, # safety gate (see Cross-process & durability)
)
CheckpointedChannel.__init__ validates the checkpointer implements
save_pending_request and load_pending_request โ it fails early with
HitlError if not.
The three verbsโ
confirm(prompt, *, details, timeout, signal) โ boolโ
A simple yes/no question. The host answers True or False.
if await channel.confirm("Deploy to production?", details={"env": "prod"}):
await deploy()
approve(tool_name, tool_call_id, args, *, details, timeout, signal) โ ApproveAnswerโ
The sandbox-confirm verb. Returns an ApproveAnswer with one of three decisions:
| Decision | Result |
|---|---|
"approve" | Tool runs with the original args. |
"deny" | Tool is blocked; tool_result.is_error=True with details["hitl"]["decision"]="human_deny". |
"edit" | Tool runs with the edited args (re-validated against the tool's pydantic parameter model). |
For approve requests, the envelope's question_id is set to the LLM's
tool_call_id โ no separate UUID, so host code can correlate by the same
ID it already tracks from the tool stream.
ask(questions, *, timeout, signal) โ dict[str, str | list[str]]โ
A structured form with one or more Question objects. Each question can be:
- Free-text (
options=None) - Single-select (
options=[...],multi_select=False) - Multi-select (
options=[...],multi_select=True) - "Other" with input (option has
allow_input=Trueโ user types free text)
from cubepi.hitl.types import Question, Option
answers = await channel.ask([
Question(key="framework", prompt="Which framework?", options=[
Option(label="React", value="react"),
Option(label="Vue", value="vue"),
Option(label="Other", value="other", allow_input=True),
]),
Question(key="features", prompt="Enable:", multi_select=True, options=[
Option(label="Auth", value="auth"),
Option(label="Payments", value="payments"),
]),
])
# answers == {"framework": "react", "features": ["auth", "payments"]}
Timeoutโ
Both channels accept a default_timeout constructor arg and every verb
accepts a per-call timeout kwarg (per-call overrides default).
| Layer | API |
|---|---|
| Channel constructor | InMemoryChannel(default_timeout=30.0) |
| Per-call override | channel.confirm("ok?", timeout=10.0) |
| Per-call off | channel.approve(..., timeout=None) when channel default is set |
Timeout expiry raises HitlTimedOut(BaseException) from the agent-side
await. The surrounding tool or middleware translates it to
tool_result.is_error=True with details["hitl"]["decision"]="timed_out",
so the model sees a clean denial and can react naturally. The envelope's
HitlRequest.timeout_seconds is filled automatically so the frontend can
render a countdown.