cubepi.tracing
tracing_contextâ
function
tracing_context(*, tags: list[str] | tuple[str, ...] | None = None, metadata: dict[str, Any] | None = None) -> Iterator[None]
Scope tags / metadata onto agent runs started inside this block.
The recorder reads these contextvars on AgentStartEvent and
stamps them on the invoke_agent span as:
cubepi.tagsâ tuple of strings (OTel attribute type)- one attribute per metadata key, namespaced under
cubepi.metadata.*(e.g.metadata=\{"user_id": "u-42"\}âcubepi.metadata.user_id = "u-42"). The dedicated sub-namespace keeps recorder-owned schema keys (cubepi.run_id,cubepi.turn.index, âĻ) safe from caller-supplied collisions.
The contextvar nature means this works for concurrent agents: each asyncio task tree gets its own value. Nested blocks merge additively (tags concatenate; metadata is union with inner keys winning).
Args
tagsâ Tags to apply to runs started in this scope. Stored as a tuple on the span so it round-trips through OTel's attribute serializer.metadataâ Per-run key/value pairs. Values must be types that OTel attributes accept (str, bool, int, float, or a tuple/list of those); other shapes will be silently dropped by the recorder.
JsonlSpanExporterâ
class
JsonlSpanExporter(self, directory: str | os.PathLike[str] = './cubepi-traces')
Write each ReadableSpan as one JSON line.
Files: <directory>/<YYYY-MM-DD>/<trace_id>.jsonl, one file per trace
so the parent run and any nested subagent runs (same trace_id, different
cubepi.run_id) land together. The date used for the subdirectory is
the span's start time (UTC). The trace_id is the span's 32-hex trace
id; spans without a context fall back to "unknown-trace".
Permissions: files are created mode 0o600 (user-only).
Methods
exportâ
export(spans: Sequence[ReadableSpan]) -> SpanExportResult
shutdownâ
shutdown() -> None
force_flushâ
force_flush(timeout_millis: int = 30000) -> bool
Meterâ
class
Meter(self, *, resource: Resource | None = None, exporters: list[MetricExporter] | None = None, export_interval_millis: int = 60000)
Emit OTel GenAI histograms alongside the cubepi Tracer.
Construct with a list of MetricExporter instances (e.g. the
OTLP metric exporter); call attach to start emitting.
Example:
from cubepi.tracing import Tracer, Meter
from opentelemetry.exporter.otlp.proto.http.metric_exporter import (
OTLPMetricExporter,
)
tracer = Tracer(service_name="my-bot", exporters=[...])
meter = Meter(
resource=tracer.resource,
exporters=[OTLPMetricExporter(endpoint="http://collector:4318/v1/metrics")],
)
tracer.attach(agent)
meter.attach(agent)
Attributes
otel_meter:Anyâ The underlyingopentelemetry.metrics.Meterâ exposed so callers can register their own instruments.
Methods
attachâ
attach(agent: 'Agent') -> Callable[[], None]
Subscribe to agent and start emitting metrics.
Each attach() call gets its own _MeterState so
multiple concurrent agents attached to the same Meter don't
clobber each other's open-ns timestamps and attribute dicts
(codex overall-review MAJOR â previously all state lived on
the Meter instance, so a second agent's provider_request
overwrote the first agent's _chat_open_ns / _chat_attrs
before its response landed).
force_flushâ
force_flush(timeout_seconds: float = 30.0) -> bool
shutdownâ
shutdown(timeout_seconds: float = 30.0) -> None
attachedâ
attached(agent: 'Agent') -> AsyncIterator['Meter']
RAII wrapper around attach.
async with enters by attaching the per-attach state, exits
by calling the detach callable returned from attach.
Mirrors Tracer.attached â use both for the cleanest
end-to-end shutdown:
async with (
Tracer(...) as tracer,
Meter(resource=tracer.resource, ...) as meter,
tracer.attached(agent),
meter.attached(agent),
):
await agent.prompt("...")
# auto: detach both + shutdown both
SCHEMA_URLâ
attribute
SCHEMA_URL = 'https://opentelemetry.io/schemas/1.41.0'
Tracerâ
class
Tracer(self, *, service_name: str | None = None, service_version: str | None = None, service_namespace: str | None = None, service_instance_id: str | None = None, deployment_environment: str | None = None, agent_name: str | None = None, agent_id: str | None = None, agent_description: str | None = None, agent_version: str | None = None, exporters: list[SpanExporter] | None = None, record_content: bool = False, record_stream: bool = False, stream_dir: 'str | Path | None' = None, redact: 'Callable[[str, Any], Any] | None' = None, resource: Resource | None = None, atexit_flush: bool = True, atexit_flush_timeout_seconds: float = 5.0)
Attaches OTel-compatible tracing to a cubepi Agent.
Construct once per process (or per service). Each attach(agent)
call wires the cubepi recorder to the agent's event stream and
provider listener registry; the returned callable detaches.
Example
from cubepi.tracing import Tracer
from cubepi.tracing.exporters import JsonlSpanExporter
tracer = Tracer(
service_name="my-bot",
agent_name="coding-agent",
exporters=[JsonlSpanExporter(directory="./cubepi-traces")],
)
detach = tracer.attach(agent)
try:
...
finally:
detach()
await tracer.shutdown()
Attributes
resource:Resourceotel_tracer:Anyâ The underlyingopentelemetry.trace.Tracerinstance.redact:'Callable[[str, Any], Any] | None'â Optional(key, value) -> valuefilter applied at everyset_attributesite for content attributes.
Methods
attachâ
attach(agent: 'Agent') -> Callable[[], Any]
Wire the cubepi recorder to agent.
Returns a detach() callable. When invoked:
- Synchronously: unsubscribes every hook, closes any spans still open from a cancelled run, sweeps MCP tool-span registrations â observable on the next line.
- Schedules a flush on the running event loop and returns the
resulting
asyncio.Taskso callers canawait detach()to block until buffered spans have been exported. Outside an async context returnsNoneâ callawait tracer.shutdown()separately to flush.
Either await detach() or await tracer.shutdown() (or
both) must be used in the caller's finally block; the
synchronous detach() alone does not guarantee that ended
spans have left BatchSpanProcessor (codex overall-review
MAJOR).
Also registers this Tracer's private TracerProvider with
cubepi.mcp._tracing so MCP CLIENT spans flow through
the same exporters â without this step, the MCP module falls
back to the OTel global default (a no-op provider when the
caller didn't separately call trace.set_tracer_provider).
The detach callable unregisters.
force_flushâ
force_flush(timeout_seconds: float = 30.0) -> bool
Block until all currently buffered spans are exported.
Returns False on timeout.
shutdownâ
shutdown(timeout_seconds: float = 30.0) -> None
Flush and close all exporters. Tracer is unusable after this.
attachedâ
attached(agent: 'Agent') -> AsyncIterator['Tracer']
RAII wrapper around attach.
async with enters by attaching the recorder, exits by
running detach and (in an async context) awaiting its returned
flush Task. Equivalent to:
detach = tracer.attach(agent)
try:
...
finally:
result = detach()
if result is not None: # async context: it's a Task
await result
Use this instead of the bare attach / finally: detach()
pattern when you want the cleanup tied to a single async with block. Combines with Tracer's own context
manager:
async with Tracer(...) as tracer, tracer.attached(agent):
await agent.prompt("...")
# auto: detach (closes cancelled spans) + shutdown (flush + close)
traceâ
function
trace(tracer: 'Tracer | None', agent: 'Agent') -> AsyncIterator[None]
Best-effort tracing scope for one agent run.
Attaches tracer to agent on enter and detaches + flushes its
buffered spans on exit. Every tracing fault â a failed attach, detach, or
flush â is logged and swallowed, so tracing can never break or fail the
work inside the async with block. Passing tracer=None makes the
block a no-op, which lets callers gate tracing on config without branching
at the call site.
This does not shut the tracer down: the tracer is reusable across runs,
so build it once (e.g. per process) and call await tracer.shutdown()
when the owning process stops.
Unlike Tracer.attached, which surfaces flush failures to the caller,
this helper swallows them â use it when tracing is auxiliary to the work and
must never affect its outcome.
Example
async with trace(tracer, agent):
await agent.prompt("...")