Skip to main content
Version: Next ๐Ÿšง

Human-in-the-Loop (HITL)

CubePi's HITL channel lets an agent pause and ask a human before proceeding. It handles two recurring patterns with a single primitive:

  1. Sandbox tool confirmation โ€” a dangerous tool (bash, file writes, API mutations) needs approve / deny / edit from a human before running.
  2. Mid-run structured questions โ€” the agent needs a specific selection or form answer from the user before it can continue.

The channel is an await-able coroutine collaborator. Tool authors write await channel.ask(...) and the channel handles the pause. Host code (subscribers) renders the pending request and posts an answer. Two backends cover the full spectrum:

  • InMemoryChannel โ€” CLI, notebooks, tests. Process dies, pending is lost.
  • CheckpointedChannel โ€” web services. Persists the pending request to a Checkpointer so a different process (or the same process after restart) can pick up and answer hours later.

Architectureโ€‹

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ Host (cubebox web service / CLI / TUI) โ”‚
โ”‚ โ”‚
โ”‚ subscribe to channel.pending โ—„โ”€โ”€โ”€โ”€ answer / cancel โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
โ”‚
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ HitlChannel (Protocol) โ”‚
โ”‚ confirm / approve / ask โ”‚
โ”‚ pending / answer / cancel โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”˜
โ”‚ โ”‚
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ผโ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ–ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚InMemoryChannelโ”‚ โ”‚CheckpointedChannelโ”‚
โ”‚ (Future+Queue)โ”‚ โ”‚ (Future + persi- โ”‚
โ”‚ โ”‚ โ”‚ sts to Checkpo- โ”‚
โ”‚ โ”‚ โ”‚ inter) โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
โ”‚ โ”‚
โ”‚ used by โ”€โ”€โ”€โ”€โ”ค
โ”‚ โ”‚
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ ask_user tool ConfirmToolCallMW โ”‚
โ”‚ (structured form) (approve/deny/ โ”‚
โ”‚ edit per-tool) โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
โ”‚
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ cubepi Agent loop โ”‚
โ”‚ (BeforeToolCallResult โ”‚
โ”‚ carries hitl_trace) โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Channel typesโ€‹

InMemoryChannelโ€‹

For single-process use. Holds an asyncio.Future internally โ€” the host calls channel.answer() from the same event loop.

from cubepi.hitl import InMemoryChannel

channel = InMemoryChannel(
default_timeout=180.0, # per-call timeout; None = no timeout (default)
)

CheckpointedChannelโ€‹

For cross-process use. On every confirm / approve / ask, the HitlRequest is persisted via Checkpointer.save_pending_request(thread_id, ...). On channel.answer(), the pending is cleared. On HitlDetached (graceful suspend), the pending stays so a later Agent.respond() can resume.

from cubepi.hitl import CheckpointedChannel
from cubepi.checkpointer.sqlite import SQLiteCheckpointer # or postgres / mysql

async with SQLiteCheckpointer("path/to.db") as cp:
channel = CheckpointedChannel(
checkpointer=cp,
thread_id="conversation-42",
default_timeout=None, # cross-process: typical to disable
allow_inside_custom_tool=False, # safety gate (see Cross-process & durability)
)

CheckpointedChannel.__init__ validates the checkpointer implements save_pending_request and load_pending_request โ€” it fails early with HitlError if not.

The three verbsโ€‹

confirm(prompt, *, details, timeout, signal) โ†’ boolโ€‹

A simple yes/no question. The host answers True or False.

if await channel.confirm("Deploy to production?", details={"env": "prod"}):
await deploy()

approve(tool_name, tool_call_id, args, *, details, timeout, signal) โ†’ ApproveAnswerโ€‹

The sandbox-confirm verb. Returns an ApproveAnswer with one of three decisions:

DecisionResult
"approve"Tool runs with the original args.
"deny"Tool is blocked; tool_result.is_error=True with details["hitl"]["decision"]="human_deny".
"edit"Tool runs with the edited args (re-validated against the tool's pydantic parameter model).

For approve requests, the envelope's question_id is set to the LLM's tool_call_id โ€” no separate UUID, so host code can correlate by the same ID it already tracks from the tool stream.

ask(questions, *, timeout, signal) โ†’ dict[str, str | list[str]]โ€‹

A structured form with one or more Question objects. Each question can be:

  • Free-text (options=None)
  • Single-select (options=[...], multi_select=False)
  • Multi-select (options=[...], multi_select=True)
  • "Other" with input (option has allow_input=True โ€” user types free text)
from cubepi.hitl.types import Question, Option

answers = await channel.ask([
Question(key="framework", prompt="Which framework?", options=[
Option(label="React", value="react"),
Option(label="Vue", value="vue"),
Option(label="Other", value="other", allow_input=True),
]),
Question(key="features", prompt="Enable:", multi_select=True, options=[
Option(label="Auth", value="auth"),
Option(label="Payments", value="payments"),
]),
])
# answers == {"framework": "react", "features": ["auth", "payments"]}

Timeoutโ€‹

Both channels accept a default_timeout constructor arg and every verb accepts a per-call timeout kwarg (per-call overrides default).

LayerAPI
Channel constructorInMemoryChannel(default_timeout=30.0)
Per-call overridechannel.confirm("ok?", timeout=10.0)
Per-call offchannel.approve(..., timeout=None) when channel default is set

Timeout expiry raises HitlTimedOut(BaseException) from the agent-side await. The surrounding tool or middleware translates it to tool_result.is_error=True with details["hitl"]["decision"]="timed_out", so the model sees a clean denial and can react naturally. The envelope's HitlRequest.timeout_seconds is filled automatically so the frontend can render a countdown.